Bug 736393 - Don't abort on store failure. r=rnewman

--HG--
extra : rebase_source : dd9d3b27397739265bcc7a46167e6cffbe4afc23
This commit is contained in:
Nick Alexander 2012-05-31 12:21:08 -07:00
parent 220707d69a
commit d49f32eda3
27 changed files with 430 additions and 255 deletions

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,12 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync;
/**
* A previous POST failed, so we won't send any more records this session.
*/
public class Server11PreviousPostFailedException extends SyncException {
private static final long serialVersionUID = -3582490631414624310L;
}

View File

@ -0,0 +1,12 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync;
/**
* The server rejected a record in its "failure" array.
*/
public class Server11RecordPostFailedException extends SyncException {
private static final long serialVersionUID = -8517471217486190314L;
}

View File

@ -175,10 +175,10 @@ public class Crypto5MiddlewareRepositorySession extends MiddlewareRepositorySess
try { try {
rec.encrypt(); rec.encrypt();
} catch (UnsupportedEncodingException e) { } catch (UnsupportedEncodingException e) {
delegate.onRecordStoreFailed(e); delegate.onRecordStoreFailed(e, record.guid);
return; return;
} catch (CryptoException e) { } catch (CryptoException e) {
delegate.onRecordStoreFailed(e); delegate.onRecordStoreFailed(e, record.guid);
return; return;
} }
// Allow the inner session to do delegate handling. // Allow the inner session to do delegate handling.

View File

@ -0,0 +1,11 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.repositories;
import org.mozilla.gecko.sync.SyncException;
public class FetchFailedException extends SyncException {
private static final long serialVersionUID = -7533105300182522946L;
}

View File

@ -21,6 +21,8 @@ import org.mozilla.gecko.sync.DelayedWorkTracker;
import org.mozilla.gecko.sync.ExtendedJSONObject; import org.mozilla.gecko.sync.ExtendedJSONObject;
import org.mozilla.gecko.sync.HTTPFailureException; import org.mozilla.gecko.sync.HTTPFailureException;
import org.mozilla.gecko.sync.Logger; import org.mozilla.gecko.sync.Logger;
import org.mozilla.gecko.sync.Server11PreviousPostFailedException;
import org.mozilla.gecko.sync.Server11RecordPostFailedException;
import org.mozilla.gecko.sync.UnexpectedJSONException; import org.mozilla.gecko.sync.UnexpectedJSONException;
import org.mozilla.gecko.sync.crypto.KeyBundle; import org.mozilla.gecko.sync.crypto.KeyBundle;
import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest; import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
@ -28,6 +30,7 @@ import org.mozilla.gecko.sync.net.SyncStorageRequest;
import org.mozilla.gecko.sync.net.SyncStorageRequestDelegate; import org.mozilla.gecko.sync.net.SyncStorageRequestDelegate;
import org.mozilla.gecko.sync.net.SyncStorageResponse; import org.mozilla.gecko.sync.net.SyncStorageResponse;
import org.mozilla.gecko.sync.net.WBOCollectionRequestDelegate; import org.mozilla.gecko.sync.net.WBOCollectionRequestDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate; import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionGuidsSinceDelegate; import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionGuidsSinceDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate; import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
@ -311,7 +314,25 @@ public class Server11RepositorySession extends RepositorySession {
} }
protected Object recordsBufferMonitor = new Object(); protected Object recordsBufferMonitor = new Object();
/**
* Data of outbound records.
* <p>
* We buffer the data (rather than the <code>Record</code>) so that we can
* flush the buffer based on outgoing transmission size.
* <p>
* Access should be synchronized on <code>recordsBufferMonitor</code>.
*/
protected ArrayList<byte[]> recordsBuffer = new ArrayList<byte[]>(); protected ArrayList<byte[]> recordsBuffer = new ArrayList<byte[]>();
/**
* GUIDs of outbound records.
* <p>
* Used to fail entire outgoing uploads.
* <p>
* Access should be synchronized on <code>recordsBufferMonitor</code>.
*/
protected ArrayList<String> recordGuidsBuffer = new ArrayList<String>();
protected int byteCount = PER_BATCH_OVERHEAD; protected int byteCount = PER_BATCH_OVERHEAD;
@Override @Override
@ -340,6 +361,7 @@ public class Server11RepositorySession extends RepositorySession {
flush(); flush();
} }
recordsBuffer.add(json); recordsBuffer.add(json);
recordGuidsBuffer.add(record.guid);
byteCount += PER_RECORD_OVERHEAD + delta; byteCount += PER_RECORD_OVERHEAD + delta;
} }
} }
@ -349,10 +371,12 @@ public class Server11RepositorySession extends RepositorySession {
protected void flush() { protected void flush() {
if (recordsBuffer.size() > 0) { if (recordsBuffer.size() > 0) {
final ArrayList<byte[]> outgoing = recordsBuffer; final ArrayList<byte[]> outgoing = recordsBuffer;
final ArrayList<String> outgoingGuids = recordGuidsBuffer;
RepositorySessionStoreDelegate uploadDelegate = this.delegate; RepositorySessionStoreDelegate uploadDelegate = this.delegate;
storeWorkQueue.execute(new RecordUploadRunnable(uploadDelegate, outgoing, byteCount)); storeWorkQueue.execute(new RecordUploadRunnable(uploadDelegate, outgoing, outgoingGuids, byteCount));
recordsBuffer = new ArrayList<byte[]>(); recordsBuffer = new ArrayList<byte[]>();
recordGuidsBuffer = new ArrayList<String>();
byteCount = PER_BATCH_OVERHEAD; byteCount = PER_BATCH_OVERHEAD;
} }
} }
@ -377,6 +401,20 @@ public class Server11RepositorySession extends RepositorySession {
} }
} }
/**
* <code>true</code> if a record upload has failed this session.
* <p>
* This is only set in begin and possibly by <code>RecordUploadRunnable</code>.
* Since those are executed serially, we can use an unsynchronized
* volatile boolean here.
*/
protected volatile boolean recordUploadFailed;
public void begin(RepositorySessionBeginDelegate delegate) throws InvalidSessionTransitionException {
recordUploadFailed = false;
super.begin(delegate);
}
/** /**
* Make an HTTP request, and convert HTTP request delegate callbacks into * Make an HTTP request, and convert HTTP request delegate callbacks into
* store callbacks within the context of this RepositorySession. * store callbacks within the context of this RepositorySession.
@ -388,15 +426,18 @@ public class Server11RepositorySession extends RepositorySession {
public final String LOG_TAG = "RecordUploadRunnable"; public final String LOG_TAG = "RecordUploadRunnable";
private ArrayList<byte[]> outgoing; private ArrayList<byte[]> outgoing;
private ArrayList<String> outgoingGuids;
private long byteCount; private long byteCount;
public RecordUploadRunnable(RepositorySessionStoreDelegate storeDelegate, public RecordUploadRunnable(RepositorySessionStoreDelegate storeDelegate,
ArrayList<byte[]> outgoing, ArrayList<byte[]> outgoing,
ArrayList<String> outgoingGuids,
long byteCount) { long byteCount) {
Logger.info(LOG_TAG, "Preparing record upload for " + Logger.info(LOG_TAG, "Preparing record upload for " +
outgoing.size() + " records (" + outgoing.size() + " records (" +
byteCount + " bytes)."); byteCount + " bytes).");
this.outgoing = outgoing; this.outgoing = outgoing;
this.outgoingGuids = outgoingGuids;
this.byteCount = byteCount; this.byteCount = byteCount;
} }
@ -419,7 +460,7 @@ public class Server11RepositorySession extends RepositorySession {
body = response.jsonObjectBody(); // jsonObjectBody() throws or returns non-null. body = response.jsonObjectBody(); // jsonObjectBody() throws or returns non-null.
} catch (Exception e) { } catch (Exception e) {
Logger.error(LOG_TAG, "Got exception parsing POST success body.", e); Logger.error(LOG_TAG, "Got exception parsing POST success body.", e);
// TODO this.handleRequestError(e);
return; return;
} }
@ -437,21 +478,34 @@ public class Server11RepositorySession extends RepositorySession {
try { try {
JSONArray success = body.getArray("success"); JSONArray success = body.getArray("success");
ExtendedJSONObject failed = body.getObject("failed");
if ((success != null) && if ((success != null) &&
(success.size() > 0)) { (success.size() > 0)) {
Logger.debug(LOG_TAG, "Successful records: " + success.toString()); Logger.debug(LOG_TAG, "Successful records: " + success.toString());
// TODO: how do we notify without the whole record? for (Object o : success) {
try {
delegate.onRecordStoreSucceeded((String) o);
} catch (ClassCastException e) {
Logger.error(LOG_TAG, "Got exception parsing POST success guid.", e);
// Not much to be done.
}
}
long normalizedTimestamp = getNormalizedTimestamp(response); long normalizedTimestamp = getNormalizedTimestamp(response);
Logger.debug(LOG_TAG, "Passing back upload X-Weave-Timestamp: " + normalizedTimestamp); Logger.debug(LOG_TAG, "Passing back upload X-Weave-Timestamp: " + normalizedTimestamp);
bumpUploadTimestamp(normalizedTimestamp); bumpUploadTimestamp(normalizedTimestamp);
} }
success = null; // Want to GC this ASAP.
ExtendedJSONObject failed = body.getObject("failed");
if ((failed != null) && if ((failed != null) &&
(failed.object.size() > 0)) { (failed.object.size() > 0)) {
Logger.debug(LOG_TAG, "Failed records: " + failed.object.toString()); Logger.debug(LOG_TAG, "Failed records: " + failed.object.toString());
// TODO: notify. Exception ex = new Server11RecordPostFailedException();
for (String guid : failed.keySet()) {
delegate.onRecordStoreFailed(ex, guid);
}
} }
failed = null; // Want to GC this ASAP.
} catch (UnexpectedJSONException e) { } catch (UnexpectedJSONException e) {
Logger.error(LOG_TAG, "Got exception processing success/failed in POST success body.", e); Logger.error(LOG_TAG, "Got exception processing success/failed in POST success body.", e);
// TODO // TODO
@ -462,7 +516,6 @@ public class Server11RepositorySession extends RepositorySession {
@Override @Override
public void handleRequestFailure(SyncStorageResponse response) { public void handleRequestFailure(SyncStorageResponse response) {
// TODO: ensure that delegate methods don't get called more than once.
// TODO: call session.interpretHTTPFailure. // TODO: call session.interpretHTTPFailure.
this.handleRequestError(new HTTPFailureException(response)); this.handleRequestError(new HTTPFailureException(response));
} }
@ -470,7 +523,14 @@ public class Server11RepositorySession extends RepositorySession {
@Override @Override
public void handleRequestError(final Exception ex) { public void handleRequestError(final Exception ex) {
Logger.warn(LOG_TAG, "Got request error: " + ex, ex); Logger.warn(LOG_TAG, "Got request error: " + ex, ex);
delegate.onRecordStoreFailed(ex);
recordUploadFailed = true;
ArrayList<String> failedOutgoingGuids = outgoingGuids;
outgoingGuids = null; // Want to GC this ASAP.
for (String guid : failedOutgoingGuids) {
delegate.onRecordStoreFailed(ex, guid);
}
return;
} }
public class ByteArraysContentProducer implements ContentProducer { public class ByteArraysContentProducer implements ContentProducer {
@ -520,6 +580,15 @@ public class Server11RepositorySession extends RepositorySession {
@Override @Override
public void run() { public void run() {
if (recordUploadFailed) {
Logger.info(LOG_TAG, "Previous record upload failed. Failing all records and not retrying.");
Exception ex = new Server11PreviousPostFailedException();
for (String guid : outgoingGuids) {
delegate.onRecordStoreFailed(ex, guid);
}
return;
}
if (outgoing == null || if (outgoing == null ||
outgoing.size() == 0) { outgoing.size() == 0) {
Logger.debug(LOG_TAG, "No items: RecordUploadRunnable returning immediately."); Logger.debug(LOG_TAG, "No items: RecordUploadRunnable returning immediately.");

View File

@ -0,0 +1,11 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.repositories;
import org.mozilla.gecko.sync.SyncException;
public class StoreFailedException extends SyncException {
private static final long serialVersionUID = 6080340122855859752L;
}

View File

@ -588,7 +588,7 @@ public class AndroidBrowserBookmarksRepositorySession extends AndroidBrowserRepo
try { try {
Uri recordURI = dbHelper.insert(toStore); Uri recordURI = dbHelper.insert(toStore);
if (recordURI == null) { if (recordURI == null) {
delegate.onRecordStoreFailed(new RuntimeException("Got null URI inserting folder with guid " + toStore.guid + ".")); delegate.onRecordStoreFailed(new RuntimeException("Got null URI inserting folder with guid " + toStore.guid + "."), record.guid);
return false; return false;
} }
toStore.androidID = ContentUris.parseId(recordURI); toStore.androidID = ContentUris.parseId(recordURI);
@ -596,11 +596,11 @@ public class AndroidBrowserBookmarksRepositorySession extends AndroidBrowserRepo
updateBookkeeping(toStore); updateBookkeeping(toStore);
} catch (Exception e) { } catch (Exception e) {
delegate.onRecordStoreFailed(e); delegate.onRecordStoreFailed(e, record.guid);
return false; return false;
} }
trackRecord(toStore); trackRecord(toStore);
delegate.onRecordStoreSucceeded(toStore); delegate.onRecordStoreSucceeded(record.guid);
return true; return true;
} }
@ -623,12 +623,14 @@ public class AndroidBrowserBookmarksRepositorySession extends AndroidBrowserRepo
// Something failed; most pessimistic action is to declare that all insertions failed. // Something failed; most pessimistic action is to declare that all insertions failed.
// TODO: perform the bulkInsert in a transaction and rollback unless all insertions succeed? // TODO: perform the bulkInsert in a transaction and rollback unless all insertions succeed?
for (Record failed : toStores) { for (Record failed : toStores) {
delegate.onRecordStoreFailed(new RuntimeException("Possibly failed to bulkInsert non-folder with guid " + failed.guid + ".")); delegate.onRecordStoreFailed(new RuntimeException("Possibly failed to bulkInsert non-folder with guid " + failed.guid + "."), failed.guid);
} }
return; return;
} }
} catch (NullCursorException e) { } catch (NullCursorException e) {
delegate.onRecordStoreFailed(e); // TODO: include which records failed. for (Record failed : toStores) {
delegate.onRecordStoreFailed(e, failed.guid);
}
return; return;
} }
@ -640,7 +642,7 @@ public class AndroidBrowserBookmarksRepositorySession extends AndroidBrowserRepo
Logger.warn(LOG_TAG, "Got exception updating bookkeeping of non-folder with guid " + succeeded.guid + ".", e); Logger.warn(LOG_TAG, "Got exception updating bookkeeping of non-folder with guid " + succeeded.guid + ".", e);
} }
trackRecord(succeeded); trackRecord(succeeded);
delegate.onRecordStoreSucceeded(succeeded); delegate.onRecordStoreSucceeded(succeeded.guid);
} }
} }

View File

@ -214,7 +214,7 @@ public class AndroidBrowserHistoryRepositorySession extends AndroidBrowserReposi
// Something failed; most pessimistic action is to declare that all insertions failed. // Something failed; most pessimistic action is to declare that all insertions failed.
// TODO: perform the bulkInsert in a transaction and rollback unless all insertions succeed? // TODO: perform the bulkInsert in a transaction and rollback unless all insertions succeed?
for (HistoryRecord failed : outgoing) { for (HistoryRecord failed : outgoing) {
delegate.onRecordStoreFailed(new RuntimeException("Failed to insert history item with guid " + failed.guid + ".")); delegate.onRecordStoreFailed(new RuntimeException("Failed to insert history item with guid " + failed.guid + "."), failed.guid);
} }
return; return;
} }
@ -234,7 +234,7 @@ public class AndroidBrowserHistoryRepositorySession extends AndroidBrowserReposi
throw e; throw e;
} }
trackRecord(succeeded); trackRecord(succeeded);
delegate.onRecordStoreSucceeded(succeeded); // At this point, we are really inserted. delegate.onRecordStoreSucceeded(succeeded.guid); // At this point, we are really inserted.
} }
} }

View File

@ -384,7 +384,7 @@ public abstract class AndroidBrowserRepositorySession extends StoreTrackingRepos
public void run() { public void run() {
if (!isActive()) { if (!isActive()) {
Logger.warn(LOG_TAG, "AndroidBrowserRepositorySession is inactive. Store failing."); Logger.warn(LOG_TAG, "AndroidBrowserRepositorySession is inactive. Store failing.");
delegate.onRecordStoreFailed(new InactiveSessionException(null)); delegate.onRecordStoreFailed(new InactiveSessionException(null), record.guid);
return; return;
} }
@ -503,24 +503,24 @@ public abstract class AndroidBrowserRepositorySession extends StoreTrackingRepos
// of reconcileRecords. // of reconcileRecords.
Logger.debug(LOG_TAG, "Calling delegate callback with guid " + replaced.guid + Logger.debug(LOG_TAG, "Calling delegate callback with guid " + replaced.guid +
"(" + replaced.androidID + ")"); "(" + replaced.androidID + ")");
delegate.onRecordStoreSucceeded(replaced); delegate.onRecordStoreSucceeded(replaced.guid);
return; return;
} catch (MultipleRecordsForGuidException e) { } catch (MultipleRecordsForGuidException e) {
Logger.error(LOG_TAG, "Multiple records returned for given guid: " + record.guid); Logger.error(LOG_TAG, "Multiple records returned for given guid: " + record.guid);
delegate.onRecordStoreFailed(e); delegate.onRecordStoreFailed(e, record.guid);
return; return;
} catch (NoGuidForIdException e) { } catch (NoGuidForIdException e) {
Logger.error(LOG_TAG, "Store failed for " + record.guid, e); Logger.error(LOG_TAG, "Store failed for " + record.guid, e);
delegate.onRecordStoreFailed(e); delegate.onRecordStoreFailed(e, record.guid);
return; return;
} catch (NullCursorException e) { } catch (NullCursorException e) {
Logger.error(LOG_TAG, "Store failed for " + record.guid, e); Logger.error(LOG_TAG, "Store failed for " + record.guid, e);
delegate.onRecordStoreFailed(e); delegate.onRecordStoreFailed(e, record.guid);
return; return;
} catch (Exception e) { } catch (Exception e) {
Logger.error(LOG_TAG, "Store failed for " + record.guid, e); Logger.error(LOG_TAG, "Store failed for " + record.guid, e);
delegate.onRecordStoreFailed(e); delegate.onRecordStoreFailed(e, record.guid);
return; return;
} }
} }
@ -539,7 +539,7 @@ public abstract class AndroidBrowserRepositorySession extends StoreTrackingRepos
// TODO: we ought to mark the record as deleted rather than purging it, // TODO: we ought to mark the record as deleted rather than purging it,
// in order to support syncing to multiple destinations. Bug 722607. // in order to support syncing to multiple destinations. Bug 722607.
dbHelper.purgeGuid(record.guid); dbHelper.purgeGuid(record.guid);
delegate.onRecordStoreSucceeded(record); delegate.onRecordStoreSucceeded(record.guid);
} }
protected void insert(Record record) throws NoGuidForIdException, NullCursorException, ParentNotFoundException { protected void insert(Record record) throws NoGuidForIdException, NullCursorException, ParentNotFoundException {
@ -552,7 +552,7 @@ public abstract class AndroidBrowserRepositorySession extends StoreTrackingRepos
updateBookkeeping(toStore); updateBookkeeping(toStore);
trackRecord(toStore); trackRecord(toStore);
delegate.onRecordStoreSucceeded(toStore); delegate.onRecordStoreSucceeded(toStore.guid);
Logger.debug(LOG_TAG, "Inserted record with guid " + toStore.guid + " as androidID " + toStore.androidID); Logger.debug(LOG_TAG, "Inserted record with guid " + toStore.guid + " as androidID " + toStore.androidID);
} }

View File

@ -12,7 +12,6 @@ import org.mozilla.gecko.db.BrowserContract;
import org.mozilla.gecko.sync.Logger; import org.mozilla.gecko.sync.Logger;
import org.mozilla.gecko.sync.repositories.NullCursorException; import org.mozilla.gecko.sync.repositories.NullCursorException;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate; import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
import org.mozilla.gecko.sync.repositories.domain.BookmarkRecord;
/** /**
* Queue up deletions. Process them at the end. * Queue up deletions. Process them at the end.
@ -222,11 +221,8 @@ public class BookmarksDeletionManager {
return; return;
} }
Logger.trace(LOG_TAG, "Invoking store callback for " + nonFolderGUIDs.length + " GUIDs."); Logger.trace(LOG_TAG, "Invoking store callback for " + nonFolderGUIDs.length + " GUIDs.");
final long now = System.currentTimeMillis();
BookmarkRecord r = new BookmarkRecord(null, "bookmarks", now, true);
for (String guid : nonFolderGUIDs) { for (String guid : nonFolderGUIDs) {
r.guid = guid; delegate.onRecordStoreSucceeded(guid);
delegate.onRecordStoreSucceeded(r);
} }
} }

View File

@ -140,11 +140,11 @@ public class FennecTabsRepository extends Repository {
public void run() { public void run() {
Logger.debug(LOG_TAG, "Storing tabs for client " + tabsRecord.guid); Logger.debug(LOG_TAG, "Storing tabs for client " + tabsRecord.guid);
if (!isActive()) { if (!isActive()) {
delegate.onRecordStoreFailed(new InactiveSessionException(null)); delegate.onRecordStoreFailed(new InactiveSessionException(null), record.guid);
return; return;
} }
if (tabsRecord.guid == null) { if (tabsRecord.guid == null) {
delegate.onRecordStoreFailed(new RuntimeException("Can't store record with null GUID.")); delegate.onRecordStoreFailed(new RuntimeException("Can't store record with null GUID."), record.guid);
return; return;
} }
@ -157,9 +157,9 @@ public class FennecTabsRepository extends Repository {
clientsProvider.delete(BrowserContract.Clients.CONTENT_URI, clientsProvider.delete(BrowserContract.Clients.CONTENT_URI,
CLIENT_GUID_IS, CLIENT_GUID_IS,
selectionArgs); selectionArgs);
delegate.onRecordStoreSucceeded(record); delegate.onRecordStoreSucceeded(record.guid);
} catch (Exception e) { } catch (Exception e) {
delegate.onRecordStoreFailed(e); delegate.onRecordStoreFailed(e, record.guid);
} }
return; return;
} }
@ -184,10 +184,10 @@ public class FennecTabsRepository extends Repository {
final int inserted = tabsProvider.bulkInsert(BrowserContract.Tabs.CONTENT_URI, tabsArray); final int inserted = tabsProvider.bulkInsert(BrowserContract.Tabs.CONTENT_URI, tabsArray);
Logger.trace(LOG_TAG, "Inserted: " + inserted); Logger.trace(LOG_TAG, "Inserted: " + inserted);
delegate.onRecordStoreSucceeded(tabsRecord); delegate.onRecordStoreSucceeded(record.guid);
} catch (Exception e) { } catch (Exception e) {
Logger.warn(LOG_TAG, "Error storing tabs.", e); Logger.warn(LOG_TAG, "Error storing tabs.", e);
delegate.onRecordStoreFailed(e); delegate.onRecordStoreFailed(e, record.guid);
} }
} }
}; };

View File

@ -450,7 +450,7 @@ public class FormHistoryRepositorySession extends
try { try {
flushInsertQueue(); flushInsertQueue();
} catch (Exception e) { } catch (Exception e) {
delegate.onRecordStoreFailed(e); delegate.onRecordStoreFailed(e, record.guid);
return; return;
} }
} }
@ -491,7 +491,8 @@ public class FormHistoryRepositorySession extends
} }
storeDone(now()); storeDone(now());
} catch (Exception e) { } catch (Exception e) {
delegate.onRecordStoreFailed(e); // XXX TODO
delegate.onRecordStoreFailed(e, null);
} }
} }
}; };
@ -562,7 +563,7 @@ public class FormHistoryRepositorySession extends
public void run() { public void run() {
if (!isActive()) { if (!isActive()) {
Logger.warn(LOG_TAG, "FormHistoryRepositorySession is inactive. Store failing."); Logger.warn(LOG_TAG, "FormHistoryRepositorySession is inactive. Store failing.");
delegate.onRecordStoreFailed(new InactiveSessionException(null)); delegate.onRecordStoreFailed(new InactiveSessionException(null), record.guid);
return; return;
} }
@ -605,7 +606,7 @@ public class FormHistoryRepositorySession extends
Logger.trace(LOG_TAG, "Remote modified, local not. Deleting."); Logger.trace(LOG_TAG, "Remote modified, local not. Deleting.");
deleteExistingRecord(existingRecord); deleteExistingRecord(existingRecord);
trackRecord(record); trackRecord(record);
delegate.onRecordStoreSucceeded(record); delegate.onRecordStoreSucceeded(record.guid);
return; return;
} }
@ -614,7 +615,7 @@ public class FormHistoryRepositorySession extends
Logger.trace(LOG_TAG, "Remote is newer, and deleted. Purging local."); Logger.trace(LOG_TAG, "Remote is newer, and deleted. Purging local.");
deleteExistingRecord(existingRecord); deleteExistingRecord(existingRecord);
trackRecord(record); trackRecord(record);
delegate.onRecordStoreSucceeded(record); delegate.onRecordStoreSucceeded(record.guid);
return; return;
} }
@ -638,7 +639,7 @@ public class FormHistoryRepositorySession extends
Logger.trace(LOG_TAG, "No match. Inserting."); Logger.trace(LOG_TAG, "No match. Inserting.");
insertNewRegularRecord(record); insertNewRegularRecord(record);
trackRecord(record); trackRecord(record);
delegate.onRecordStoreSucceeded(record); delegate.onRecordStoreSucceeded(record.guid);
return; return;
} }
@ -650,7 +651,7 @@ public class FormHistoryRepositorySession extends
Logger.trace(LOG_TAG, "Remote guid different from local guid. Storing to keep remote guid."); Logger.trace(LOG_TAG, "Remote guid different from local guid. Storing to keep remote guid.");
replaceExistingRecordWithRegularRecord(record, existingRecord); replaceExistingRecordWithRegularRecord(record, existingRecord);
trackRecord(record); trackRecord(record);
delegate.onRecordStoreSucceeded(record); delegate.onRecordStoreSucceeded(record.guid);
return; return;
} }
@ -660,7 +661,7 @@ public class FormHistoryRepositorySession extends
Logger.trace(LOG_TAG, "Remote modified, local not. Storing."); Logger.trace(LOG_TAG, "Remote modified, local not. Storing.");
replaceExistingRecordWithRegularRecord(record, existingRecord); replaceExistingRecordWithRegularRecord(record, existingRecord);
trackRecord(record); trackRecord(record);
delegate.onRecordStoreSucceeded(record); delegate.onRecordStoreSucceeded(record.guid);
return; return;
} }
@ -669,7 +670,7 @@ public class FormHistoryRepositorySession extends
Logger.trace(LOG_TAG, "Remote is newer, and not deleted. Storing."); Logger.trace(LOG_TAG, "Remote is newer, and not deleted. Storing.");
replaceExistingRecordWithRegularRecord(record, existingRecord); replaceExistingRecordWithRegularRecord(record, existingRecord);
trackRecord(record); trackRecord(record);
delegate.onRecordStoreSucceeded(record); delegate.onRecordStoreSucceeded(record.guid);
return; return;
} }
@ -680,7 +681,7 @@ public class FormHistoryRepositorySession extends
return; return;
} catch (Exception e) { } catch (Exception e) {
Logger.error(LOG_TAG, "Store failed for " + record.guid, e); Logger.error(LOG_TAG, "Store failed for " + record.guid, e);
delegate.onRecordStoreFailed(e); delegate.onRecordStoreFailed(e, record.guid);
return; return;
} }
} }

View File

@ -257,13 +257,13 @@ public class PasswordsRepositorySession extends
public void run() { public void run() {
if (!isActive()) { if (!isActive()) {
Logger.warn(LOG_TAG, "RepositorySession is inactive. Store failing."); Logger.warn(LOG_TAG, "RepositorySession is inactive. Store failing.");
delegate.onRecordStoreFailed(new InactiveSessionException(null)); delegate.onRecordStoreFailed(new InactiveSessionException(null), record.guid);
return; return;
} }
final String guid = remoteRecord.guid; final String guid = remoteRecord.guid;
if (guid == null) { if (guid == null) {
delegate.onRecordStoreFailed(new RuntimeException("Can't store record with null GUID.")); delegate.onRecordStoreFailed(new RuntimeException("Can't store record with null GUID."), record.guid);
return; return;
} }
@ -272,10 +272,10 @@ public class PasswordsRepositorySession extends
existingRecord = retrieveByGUID(guid); existingRecord = retrieveByGUID(guid);
} catch (NullCursorException e) { } catch (NullCursorException e) {
// Indicates a serious problem. // Indicates a serious problem.
delegate.onRecordStoreFailed(e); delegate.onRecordStoreFailed(e, record.guid);
return; return;
} catch (RemoteException e) { } catch (RemoteException e) {
delegate.onRecordStoreFailed(e); delegate.onRecordStoreFailed(e, record.guid);
return; return;
} }
@ -330,7 +330,15 @@ public class PasswordsRepositorySession extends
// Now we're processing a non-deleted incoming record. // Now we're processing a non-deleted incoming record.
if (existingRecord == null) { if (existingRecord == null) {
trace("Looking up match for record " + remoteRecord.guid); trace("Looking up match for record " + remoteRecord.guid);
existingRecord = findExistingRecord(remoteRecord); try {
existingRecord = findExistingRecord(remoteRecord);
} catch (RemoteException e) {
Logger.error(LOG_TAG, "Remote exception in findExistingRecord.");
delegate.onRecordStoreFailed(e, record.guid);
} catch (NullCursorException e) {
Logger.error(LOG_TAG, "Null cursor in findExistingRecord.");
delegate.onRecordStoreFailed(e, record.guid);
}
} }
if (existingRecord == null) { if (existingRecord == null) {
@ -342,11 +350,11 @@ public class PasswordsRepositorySession extends
inserted = insert(remoteRecord); inserted = insert(remoteRecord);
} catch (RemoteException e) { } catch (RemoteException e) {
Logger.debug(LOG_TAG, "Record insert caused a RemoteException."); Logger.debug(LOG_TAG, "Record insert caused a RemoteException.");
delegate.onRecordStoreFailed(e); delegate.onRecordStoreFailed(e, record.guid);
return; return;
} }
trackRecord(inserted); trackRecord(inserted);
delegate.onRecordStoreSucceeded(inserted); delegate.onRecordStoreSucceeded(inserted.guid);
return; return;
} }
@ -369,7 +377,7 @@ public class PasswordsRepositorySession extends
replaced = replace(existingRecord, toStore); replaced = replace(existingRecord, toStore);
} catch (RemoteException e) { } catch (RemoteException e) {
Logger.debug(LOG_TAG, "Record replace caused a RemoteException."); Logger.debug(LOG_TAG, "Record replace caused a RemoteException.");
delegate.onRecordStoreFailed(e); delegate.onRecordStoreFailed(e, record.guid);
return; return;
} }
@ -377,7 +385,7 @@ public class PasswordsRepositorySession extends
// of reconcileRecords. // of reconcileRecords.
Logger.debug(LOG_TAG, "Calling delegate callback with guid " + replaced.guid + Logger.debug(LOG_TAG, "Calling delegate callback with guid " + replaced.guid +
"(" + replaced.androidID + ")"); "(" + replaced.androidID + ")");
delegate.onRecordStoreSucceeded(replaced); delegate.onRecordStoreSucceeded(record.guid);
return; return;
} }
}; };
@ -581,7 +589,7 @@ public class PasswordsRepositorySession extends
Passwords.USERNAME_FIELD + " = ? AND " + Passwords.USERNAME_FIELD + " = ? AND " +
Passwords.PASSWORD_FIELD + " = ?"; Passwords.PASSWORD_FIELD + " = ?";
private PasswordRecord findExistingRecord(PasswordRecord record) { private PasswordRecord findExistingRecord(PasswordRecord record) throws NullCursorException, RemoteException {
PasswordRecord foundRecord = null; PasswordRecord foundRecord = null;
Cursor cursor = null; Cursor cursor = null;
// Only check the data table. // Only check the data table.
@ -610,12 +618,6 @@ public class PasswordsRepositorySession extends
return foundRecord; return foundRecord;
} }
} }
} catch (RemoteException e) {
Logger.error(LOG_TAG, "Remote exception in findExistingRecord.");
delegate.onRecordStoreFailed(e);
} catch (NullCursorException e) {
Logger.error(LOG_TAG, "Null cursor in findExistingRecord.");
delegate.onRecordStoreFailed(e);
} finally { } finally {
if (cursor != null) { if (cursor != null) {
cursor.close(); cursor.close();
@ -630,10 +632,10 @@ public class PasswordsRepositorySession extends
deleteGUID(record.guid); deleteGUID(record.guid);
} catch (RemoteException e) { } catch (RemoteException e) {
Logger.error(LOG_TAG, "RemoteException in password delete."); Logger.error(LOG_TAG, "RemoteException in password delete.");
delegate.onRecordStoreFailed(e); delegate.onRecordStoreFailed(e, record.guid);
return; return;
} }
delegate.onRecordStoreSucceeded(record); delegate.onRecordStoreSucceeded(record.guid);
} }
/** /**

View File

@ -6,8 +6,6 @@ package org.mozilla.gecko.sync.repositories.delegates;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.mozilla.gecko.sync.repositories.domain.Record;
public class DeferredRepositorySessionStoreDelegate implements public class DeferredRepositorySessionStoreDelegate implements
RepositorySessionStoreDelegate { RepositorySessionStoreDelegate {
protected final RepositorySessionStoreDelegate inner; protected final RepositorySessionStoreDelegate inner;
@ -20,21 +18,21 @@ public class DeferredRepositorySessionStoreDelegate implements
} }
@Override @Override
public void onRecordStoreSucceeded(final Record record) { public void onRecordStoreSucceeded(final String guid) {
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
inner.onRecordStoreSucceeded(record); inner.onRecordStoreSucceeded(guid);
} }
}); });
} }
@Override @Override
public void onRecordStoreFailed(final Exception ex) { public void onRecordStoreFailed(final Exception ex, final String guid) {
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
inner.onRecordStoreFailed(ex); inner.onRecordStoreFailed(ex, guid);
} }
}); });
} }

View File

@ -6,8 +6,6 @@ package org.mozilla.gecko.sync.repositories.delegates;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.mozilla.gecko.sync.repositories.domain.Record;
/** /**
* These methods *must* be invoked asynchronously. Use deferredStoreDelegate if you * These methods *must* be invoked asynchronously. Use deferredStoreDelegate if you
* need help doing this. * need help doing this.
@ -16,11 +14,10 @@ import org.mozilla.gecko.sync.repositories.domain.Record;
* *
*/ */
public interface RepositorySessionStoreDelegate { public interface RepositorySessionStoreDelegate {
public void onRecordStoreFailed(Exception ex); public void onRecordStoreFailed(Exception ex, String recordGuid);
// Optionally called with an equivalent (but not necessarily identical) record // Called with a GUID when store has succeeded.
// when a store has succeeded. public void onRecordStoreSucceeded(String guid);
public void onRecordStoreSucceeded(Record record);
public void onStoreCompleted(long storeEnd); public void onStoreCompleted(long storeEnd);
public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor); public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor);
} }

View File

@ -37,6 +37,7 @@ import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDeleg
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionCreationDelegate; import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionCreationDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFinishDelegate; import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFinishDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate; import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate;
import org.mozilla.gecko.sync.synchronizer.ServerLocalSynchronizer;
import org.mozilla.gecko.sync.synchronizer.Synchronizer; import org.mozilla.gecko.sync.synchronizer.Synchronizer;
import org.mozilla.gecko.sync.synchronizer.SynchronizerDelegate; import org.mozilla.gecko.sync.synchronizer.SynchronizerDelegate;
@ -139,7 +140,7 @@ public abstract class ServerSyncStage implements
public Synchronizer getConfiguredSynchronizer(GlobalSession session) throws NoCollectionKeysSetException, URISyntaxException, NonObjectJSONException, IOException, ParseException { public Synchronizer getConfiguredSynchronizer(GlobalSession session) throws NoCollectionKeysSetException, URISyntaxException, NonObjectJSONException, IOException, ParseException {
Repository remote = wrappedServerRepo(); Repository remote = wrappedServerRepo();
Synchronizer synchronizer = new Synchronizer(); Synchronizer synchronizer = new ServerLocalSynchronizer();
synchronizer.repositoryA = remote; synchronizer.repositoryA = remote;
synchronizer.repositoryB = this.getLocalRepository(); synchronizer.repositoryB = this.getLocalRepository();
synchronizer.load(getConfig()); synchronizer.load(getConfig());
@ -522,11 +523,4 @@ public abstract class ServerSyncStage implements
session.abort(lastException, reason); session.abort(lastException, reason);
} }
} }
@Override
public void onSynchronizeAborted(Synchronizer synchronize) {
Logger.info(LOG_TAG, "onSynchronizeAborted.");
session.abort(null, "Synchronization was aborted.");
}
} }

View File

@ -84,23 +84,24 @@ class ConcurrentRecordConsumer extends RecordConsumer {
@Override @Override
public void run() { public void run() {
Record record;
while (true) { while (true) {
synchronized (monitor) {
trace("run() took monitor.");
if (stopImmediately) {
debug("Stopping immediately. Clearing queue.");
delegate.getQueue().clear();
debug("Notifying consumer.");
consumerIsDone();
return;
}
debug("run() dropped monitor.");
}
// The queue is concurrent-safe. // The queue is concurrent-safe.
while (!delegate.getQueue().isEmpty()) { while ((record = delegate.getQueue().poll()) != null) {
trace("Grabbing record..."); synchronized (monitor) {
Record record = delegate.getQueue().remove(); trace("run() took monitor.");
trace("Storing record... " + delegate); if (stopImmediately) {
debug("Stopping immediately. Clearing queue.");
delegate.getQueue().clear();
debug("Notifying consumer.");
consumerIsDone();
return;
}
debug("run() dropped monitor.");
}
trace("Storing record with guid " + record.guid + ".");
try { try {
delegate.store(record); delegate.store(record);
} catch (Exception e) { } catch (Exception e) {

View File

@ -6,6 +6,7 @@ package org.mozilla.gecko.sync.synchronizer;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.mozilla.gecko.sync.Logger; import org.mozilla.gecko.sync.Logger;
import org.mozilla.gecko.sync.ThreadPool; import org.mozilla.gecko.sync.ThreadPool;
@ -59,7 +60,7 @@ import org.mozilla.gecko.sync.repositories.domain.Record;
* @author rnewman * @author rnewman
* *
*/ */
class RecordsChannel implements public class RecordsChannel implements
RepositorySessionFetchRecordsDelegate, RepositorySessionFetchRecordsDelegate,
RepositorySessionStoreDelegate, RepositorySessionStoreDelegate,
RecordsConsumerDelegate, RecordsConsumerDelegate,
@ -72,6 +73,9 @@ class RecordsChannel implements
private long timestamp; private long timestamp;
private long fetchEnd = -1; private long fetchEnd = -1;
private final AtomicInteger numFetchFailed = new AtomicInteger();
private final AtomicInteger numStoreFailed = new AtomicInteger();
public RecordsChannel(RepositorySession source, RepositorySession sink, RecordsChannelDelegate delegate) { public RecordsChannel(RepositorySession source, RepositorySession sink, RecordsChannelDelegate delegate) {
this.source = source; this.source = source;
this.sink = sink; this.sink = sink;
@ -102,22 +106,19 @@ class RecordsChannel implements
} }
/** /**
* Attempt to abort an outstanding fetch. Finish both sessions, and * Get the number of fetch failures recorded so far.
* halt the consumer if it exists. * @return number of fetch failures.
*/ */
public void abort() { public int getFetchFailureCount() {
if (source.isActive()) { return numFetchFailed.get();
source.abort(); }
}
if (sink.isActive()) {
sink.abort();
}
toProcess.clear(); /**
if (consumer == null) { * Get the number of store failures recorded so far.
return; * @return number of store failures.
} */
consumer.halt(); public int getStoreFailureCount() {
return numStoreFailed.get();
} }
/** /**
@ -130,8 +131,11 @@ class RecordsChannel implements
failed = sink; failed = sink;
} }
this.delegate.onFlowBeginFailed(this, new SessionNotBegunException(failed)); this.delegate.onFlowBeginFailed(this, new SessionNotBegunException(failed));
return;
} }
sink.setStoreDelegate(this); sink.setStoreDelegate(this);
numFetchFailed.set(0);
numStoreFailed.set(0);
// Start a consumer thread. // Start a consumer thread.
this.consumer = new ConcurrentRecordConsumer(this); this.consumer = new ConcurrentRecordConsumer(this);
ThreadPool.run(this.consumer); ThreadPool.run(this.consumer);
@ -154,14 +158,14 @@ class RecordsChannel implements
sink.store(record); sink.store(record);
} catch (NoStoreDelegateException e) { } catch (NoStoreDelegateException e) {
Logger.error(LOG_TAG, "Got NoStoreDelegateException in RecordsChannel.store(). This should not occur. Aborting.", e); Logger.error(LOG_TAG, "Got NoStoreDelegateException in RecordsChannel.store(). This should not occur. Aborting.", e);
delegate.onFlowStoreFailed(this, e); delegate.onFlowStoreFailed(this, e, record.guid);
this.abort();
} }
} }
@Override @Override
public void onFetchFailed(Exception ex, Record record) { public void onFetchFailed(Exception ex, Record record) {
Logger.warn(LOG_TAG, "onFetchFailed. Calling for immediate stop.", ex); Logger.warn(LOG_TAG, "onFetchFailed. Calling for immediate stop.", ex);
numFetchFailed.incrementAndGet();
this.consumer.halt(); this.consumer.halt();
delegate.onFlowFetchFailed(this, ex); delegate.onFlowFetchFailed(this, ex);
} }
@ -190,14 +194,17 @@ class RecordsChannel implements
} }
@Override @Override
public void onRecordStoreFailed(Exception ex) { public void onRecordStoreFailed(Exception ex, String recordGuid) {
Logger.trace(LOG_TAG, "Failed to store record with guid " + recordGuid);
numStoreFailed.incrementAndGet();
this.consumer.stored(); this.consumer.stored();
delegate.onFlowStoreFailed(this, ex); delegate.onFlowStoreFailed(this, ex, recordGuid);
// TODO: abort? // TODO: abort?
} }
@Override @Override
public void onRecordStoreSucceeded(Record record) { public void onRecordStoreSucceeded(String guid) {
Logger.trace(LOG_TAG, "Stored record with guid " + guid);
this.consumer.stored(); this.consumer.stored();
} }
@ -232,6 +239,7 @@ class RecordsChannel implements
sink.begin(this); sink.begin(this);
} catch (InvalidSessionTransitionException e) { } catch (InvalidSessionTransitionException e) {
onBeginFailed(e); onBeginFailed(e);
return;
} }
} }
if (session == sink) { if (session == sink) {

View File

@ -8,6 +8,6 @@ public interface RecordsChannelDelegate {
public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd); public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd);
public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex); public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex);
public void onFlowFetchFailed(RecordsChannel recordsChannel, Exception ex); public void onFlowFetchFailed(RecordsChannel recordsChannel, Exception ex);
public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex); public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex, String recordGuid);
public void onFlowFinishFailed(RecordsChannel recordsChannel, Exception ex); public void onFlowFinishFailed(RecordsChannel recordsChannel, Exception ex);
} }

View File

@ -0,0 +1,17 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.synchronizer;
/**
* A <code>SynchronizerSession</code> designed to be used between a remote
* server and a local repository.
* <p>
* See <code>ServerLocalSynchronizerSession</code> for error handling details.
*/
public class ServerLocalSynchronizer extends Synchronizer {
public SynchronizerSession getSynchronizerSession() {
return new ServerLocalSynchronizerSession(this, this);
}
}

View File

@ -0,0 +1,76 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.synchronizer;
import org.mozilla.gecko.sync.Logger;
import org.mozilla.gecko.sync.repositories.FetchFailedException;
import org.mozilla.gecko.sync.repositories.StoreFailedException;
/**
* A <code>SynchronizerSession</code> designed to be used between a remote
* server and a local repository.
* <p>
* Handles failure cases as follows (in the order they will occur during a sync):
* <ul>
* <li>Remote fetch failures abort.</li>
* <li>Local store failures are ignored.</li>
* <li>Local fetch failures abort.</li>
* <li>Remote store failures abort.</li>
* </ul>
*/
public class ServerLocalSynchronizerSession extends SynchronizerSession {
protected static final String LOG_TAG = "ServLocSynchronizerSess";
public ServerLocalSynchronizerSession(Synchronizer synchronizer, SynchronizerSessionDelegate delegate) {
super(synchronizer, delegate);
}
public void onFirstFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
// Fetch failures always abort.
int numRemoteFetchFailed = recordsChannel.getFetchFailureCount();
if (numRemoteFetchFailed > 0) {
final String message = "Got " + numRemoteFetchFailed + " failures fetching remote records!";
Logger.warn(LOG_TAG, message + " Aborting session.");
delegate.onSynchronizeFailed(this, new FetchFailedException(), message);
return;
}
Logger.trace(LOG_TAG, "No failures fetching remote records.");
// Local store failures are ignored.
int numLocalStoreFailed = recordsChannel.getStoreFailureCount();
if (numLocalStoreFailed > 0) {
final String message = "Got " + numLocalStoreFailed + " failures storing local records!";
Logger.warn(LOG_TAG, message + " Ignoring local store failures and continuing synchronizer session.");
} else {
Logger.trace(LOG_TAG, "No failures storing local records.");
}
super.onFirstFlowCompleted(recordsChannel, fetchEnd, storeEnd);
}
public void onSecondFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
// Fetch failures always abort.
int numLocalFetchFailed = recordsChannel.getFetchFailureCount();
if (numLocalFetchFailed > 0) {
final String message = "Got " + numLocalFetchFailed + " failures fetching local records!";
Logger.warn(LOG_TAG, message + " Aborting session.");
delegate.onSynchronizeFailed(this, new FetchFailedException(), message);
return;
}
Logger.trace(LOG_TAG, "No failures fetching local records.");
// Remote store failures abort!
int numRemoteStoreFailed = recordsChannel.getStoreFailureCount();
if (numRemoteStoreFailed > 0) {
final String message = "Got " + numRemoteStoreFailed + " failures storing remote records!";
Logger.warn(LOG_TAG, message + " Aborting session.");
delegate.onSynchronizeFailed(this, new StoreFailedException(), message);
return;
}
Logger.trace(LOG_TAG, "No failures storing remote records.");
super.onSecondFlowCompleted(recordsChannel, fetchEnd, storeEnd);
}
}

View File

@ -27,72 +27,36 @@ import android.util.Log;
* After synchronizing, call `save` to get back a SynchronizerConfiguration with * After synchronizing, call `save` to get back a SynchronizerConfiguration with
* updated bundle information. * updated bundle information.
*/ */
public class Synchronizer { public class Synchronizer implements SynchronizerSessionDelegate {
public static final String LOG_TAG = "SyncDelSDelegate";
protected String configSyncID; // Used to pass syncID from load() back into save(). protected String configSyncID; // Used to pass syncID from load() back into save().
/** protected SynchronizerDelegate synchronizerDelegate;
* I translate the fine-grained feedback of a SynchronizerSessionDelegate into
* the coarse-grained feedback of a SynchronizerDelegate.
*/
public class SynchronizerDelegateSessionDelegate implements
SynchronizerSessionDelegate {
private static final String LOG_TAG = "SyncDelSDelegate"; @Override
private SynchronizerDelegate synchronizerDelegate; public void onInitialized(SynchronizerSession session) {
private SynchronizerSession session; session.synchronize();
}
public SynchronizerDelegateSessionDelegate(SynchronizerDelegate delegate) { @Override
this.synchronizerDelegate = delegate; public void onSynchronized(SynchronizerSession synchronizerSession) {
} Log.d(LOG_TAG, "Got onSynchronized.");
Log.d(LOG_TAG, "Notifying SynchronizerDelegate.");
this.synchronizerDelegate.onSynchronized(synchronizerSession.getSynchronizer());
}
@Override @Override
public void onInitialized(SynchronizerSession session) { public void onSynchronizeSkipped(SynchronizerSession synchronizerSession) {
this.session = session; Log.d(LOG_TAG, "Got onSynchronizeSkipped.");
session.synchronize(); Log.d(LOG_TAG, "Notifying SynchronizerDelegate as if on success.");
} this.synchronizerDelegate.onSynchronized(synchronizerSession.getSynchronizer());
}
@Override @Override
public void onSynchronized(SynchronizerSession synchronizerSession) { public void onSynchronizeFailed(SynchronizerSession session,
Log.d(LOG_TAG, "Got onSynchronized."); Exception lastException, String reason) {
Log.d(LOG_TAG, "Notifying SynchronizerDelegate."); this.synchronizerDelegate.onSynchronizeFailed(session.getSynchronizer(), lastException, reason);
this.synchronizerDelegate.onSynchronized(synchronizerSession.getSynchronizer());
}
@Override
public void onSynchronizeSkipped(SynchronizerSession synchronizerSession) {
Log.d(LOG_TAG, "Got onSynchronizeSkipped.");
Log.d(LOG_TAG, "Notifying SynchronizerDelegate as if on success.");
this.synchronizerDelegate.onSynchronized(synchronizerSession.getSynchronizer());
}
@Override
public void onSynchronizeFailed(SynchronizerSession session,
Exception lastException, String reason) {
this.synchronizerDelegate.onSynchronizeFailed(session.getSynchronizer(), lastException, reason);
}
@Override
public void onSynchronizeAborted(SynchronizerSession synchronizerSession) {
this.synchronizerDelegate.onSynchronizeAborted(session.getSynchronizer());
}
@Override
public void onFetchError(Exception e) {
session.abort();
synchronizerDelegate.onSynchronizeFailed(session.getSynchronizer(), e, "Got fetch error.");
}
@Override
public void onStoreError(Exception e) {
session.abort();
synchronizerDelegate.onSynchronizeFailed(session.getSynchronizer(), e, "Got store error.");
}
@Override
public void onSessionError(Exception e) {
session.abort();
synchronizerDelegate.onSynchronizeFailed(session.getSynchronizer(), e, "Got session error.");
}
} }
public Repository repositoryA; public Repository repositoryA;
@ -100,12 +64,19 @@ public class Synchronizer {
public RepositorySessionBundle bundleA; public RepositorySessionBundle bundleA;
public RepositorySessionBundle bundleB; public RepositorySessionBundle bundleB;
/**
* Fetch a synchronizer session appropriate for this <code>Synchronizer</code>
*/
public SynchronizerSession getSynchronizerSession() {
return new SynchronizerSession(this, this);
}
/** /**
* Start synchronizing, calling delegate's callback methods. * Start synchronizing, calling delegate's callback methods.
*/ */
public void synchronize(Context context, SynchronizerDelegate delegate) { public void synchronize(Context context, SynchronizerDelegate delegate) {
SynchronizerDelegateSessionDelegate sessionDelegate = new SynchronizerDelegateSessionDelegate(delegate); this.synchronizerDelegate = delegate;
SynchronizerSession session = new SynchronizerSession(this, sessionDelegate); SynchronizerSession session = getSynchronizerSession();
session.init(context, bundleA, bundleB); session.init(context, bundleA, bundleB);
} }

View File

@ -7,5 +7,4 @@ package org.mozilla.gecko.sync.synchronizer;
public interface SynchronizerDelegate { public interface SynchronizerDelegate {
public void onSynchronized(Synchronizer synchronizer); public void onSynchronized(Synchronizer synchronizer);
public void onSynchronizeFailed(Synchronizer synchronizer, Exception lastException, String reason); public void onSynchronizeFailed(Synchronizer synchronizer, Exception lastException, String reason);
public void onSynchronizeAborted(Synchronizer synchronize);
} }

View File

@ -51,9 +51,9 @@ implements RecordsChannelDelegate,
RepositorySessionFinishDelegate { RepositorySessionFinishDelegate {
protected static final String LOG_TAG = "SynchronizerSession"; protected static final String LOG_TAG = "SynchronizerSession";
private Synchronizer synchronizer; protected Synchronizer synchronizer;
private SynchronizerSessionDelegate delegate; protected SynchronizerSessionDelegate delegate;
private Context context; protected Context context;
/* /*
* Computed during init. * Computed during init.
@ -103,20 +103,6 @@ implements RecordsChannelDelegate,
protected RecordsChannel channelAToB; protected RecordsChannel channelAToB;
protected RecordsChannel channelBToA; protected RecordsChannel channelBToA;
public synchronized void abort() {
// Guaranteed to have been begun by the time we get to run.
if (channelAToB != null) {
channelAToB.abort();
}
// Not guaranteed. It's possible for the second flow to begin after we've aborted.
// TODO: stop this from happening!
if (channelBToA != null) {
channelBToA.abort();
}
this.delegate.onSynchronizeAborted(this);
}
/** /**
* Please don't call this until you've been notified with onInitialized. * Please don't call this until you've been notified with onInitialized.
*/ */
@ -142,38 +128,29 @@ implements RecordsChannelDelegate,
// This is the delegate for the *first* flow. // This is the delegate for the *first* flow.
RecordsChannelDelegate channelAToBDelegate = new RecordsChannelDelegate() { RecordsChannelDelegate channelAToBDelegate = new RecordsChannelDelegate() {
public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) { public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
Logger.info(LOG_TAG, "First RecordsChannel onFlowCompleted. Fetch end is " + fetchEnd + session.onFirstFlowCompleted(recordsChannel, fetchEnd, storeEnd);
". Store end is " + storeEnd + ". Starting next.");
pendingATimestamp = fetchEnd;
storeEndBTimestamp = storeEnd;
flowAToBCompleted = true;
channelBToA.flow();
} }
@Override @Override
public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex) { public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex) {
Logger.warn(LOG_TAG, "First RecordsChannel onFlowBeginFailed. Reporting session error.", ex); Logger.warn(LOG_TAG, "First RecordsChannel onFlowBeginFailed. Logging session error.", ex);
session.delegate.onSessionError(ex); session.delegate.onSynchronizeFailed(session, ex, "Failed to begin first flow.");
} }
@Override @Override
public void onFlowFetchFailed(RecordsChannel recordsChannel, Exception ex) { public void onFlowFetchFailed(RecordsChannel recordsChannel, Exception ex) {
// TODO: clean up, tear down, abort. Logger.warn(LOG_TAG, "First RecordsChannel onFlowFetchFailed. Logging remote fetch error.", ex);
Logger.warn(LOG_TAG, "First RecordsChannel onFlowFetchFailed. Reporting fetch error.", ex);
session.delegate.onFetchError(ex);
} }
@Override @Override
public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex) { public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex, String recordGuid) {
// TODO: clean up, tear down, abort. Logger.warn(LOG_TAG, "First RecordsChannel onFlowStoreFailed. Logging local store error.", ex);
Logger.warn(LOG_TAG, "First RecordsChannel onFlowStoreFailed. Reporting store error.", ex);
session.delegate.onStoreError(ex);
} }
@Override @Override
public void onFlowFinishFailed(RecordsChannel recordsChannel, Exception ex) { public void onFlowFinishFailed(RecordsChannel recordsChannel, Exception ex) {
Logger.warn(LOG_TAG, "First RecordsChannel onFlowFinishedFailed. Reporting session error.", ex); Logger.warn(LOG_TAG, "First RecordsChannel onFlowFinishedFailed. Logging session error.", ex);
session.delegate.onSessionError(ex); session.delegate.onSynchronizeFailed(session, ex, "Failed to finish first flow.");
} }
}; };
@ -188,10 +165,34 @@ implements RecordsChannelDelegate,
} }
} }
@Override /**
public void onFlowCompleted(RecordsChannel channel, long fetchEnd, long storeEnd) { * Called after the first flow completes.
Logger.info(LOG_TAG, "Second RecordsChannel onFlowCompleted. Fetch end is " + fetchEnd + * <p>
". Store end is " + storeEnd + ". Finishing."); * By default, any fetch and store failures are ignored.
* @param recordsChannel the <code>RecordsChannel</code> (for error testing).
* @param fetchEnd timestamp when fetches completed.
* @param storeEnd timestamp when stores completed.
*/
public void onFirstFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
Logger.info(LOG_TAG, "First RecordsChannel onFlowCompleted.");
Logger.info(LOG_TAG, "Fetch end is " + fetchEnd + ". Store end is " + storeEnd + ". Starting next.");
pendingATimestamp = fetchEnd;
storeEndBTimestamp = storeEnd;
flowAToBCompleted = true;
channelBToA.flow();
}
/**
* Called after the second flow completes.
* <p>
* By default, any fetch and store failures are ignored.
* @param recordsChannel the <code>RecordsChannel</code> (for error testing).
* @param fetchEnd timestamp when fetches completed.
* @param storeEnd timestamp when stores completed.
*/
public void onSecondFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
Logger.info(LOG_TAG, "Second RecordsChannel onFlowCompleted.");
Logger.info(LOG_TAG, "Fetch end is " + fetchEnd + ". Store end is " + storeEnd + ". Finishing.");
pendingBTimestamp = fetchEnd; pendingBTimestamp = fetchEnd;
storeEndATimestamp = storeEnd; storeEndATimestamp = storeEnd;
@ -202,39 +203,35 @@ implements RecordsChannelDelegate,
this.sessionA.finish(this); this.sessionA.finish(this);
} catch (InactiveSessionException e) { } catch (InactiveSessionException e) {
this.onFinishFailed(e); this.onFinishFailed(e);
return;
} }
} }
@Override
public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
onSecondFlowCompleted(recordsChannel, fetchEnd, storeEnd);
}
@Override @Override
public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex) { public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex) {
Logger.warn(LOG_TAG, "Second RecordsChannel onFlowBeginFailed. Reporting session error.", ex); Logger.warn(LOG_TAG, "Second RecordsChannel onFlowBeginFailed. Logging session error.", ex);
this.delegate.onSessionError(ex); this.delegate.onSynchronizeFailed(this, ex, "Failed to begin second flow.");
} }
@Override @Override
public void onFlowFetchFailed(RecordsChannel recordsChannel, Exception ex) { public void onFlowFetchFailed(RecordsChannel recordsChannel, Exception ex) {
// TODO: clean up, tear down, abort. Logger.warn(LOG_TAG, "Second RecordsChannel onFlowFetchFailed. Logging local fetch error.", ex);
Logger.warn(LOG_TAG, "Second RecordsChannel onFlowFetchFailed. Reporting fetch error.", ex);
this.delegate.onFetchError(ex);
} }
/**
* We ignore possible store errors, since failure to store a record is not
* necessarily a cause to abort. It might mean that the record should be
* tracked for re-downloading, or skipped, or we might abort.
*
* TODO: Bug 709371.
*/
@Override @Override
public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex) { public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex, String recordGuid) {
// TODO: clean up, tear down, abort. Logger.warn(LOG_TAG, "Second RecordsChannel onFlowStoreFailed. Logging remote store error.", ex);
Logger.warn(LOG_TAG, "Second RecordsChannel onFlowStoreFailed. Ignoring store error.", ex);
} }
@Override @Override
public void onFlowFinishFailed(RecordsChannel recordsChannel, Exception ex) { public void onFlowFinishFailed(RecordsChannel recordsChannel, Exception ex) {
Logger.warn(LOG_TAG, "Second RecordsChannel onFlowFinishedFailed. Reporting session error.", ex); Logger.warn(LOG_TAG, "Second RecordsChannel onFlowFinishedFailed. Logging session error.", ex);
this.delegate.onSessionError(ex); this.delegate.onSynchronizeFailed(this, ex, "Failed to finish second flow.");
} }
/* /*
@ -261,7 +258,7 @@ implements RecordsChannelDelegate,
} }
// We no longer need a reference to our context. // We no longer need a reference to our context.
this.context = null; this.context = null;
this.delegate.onSessionError(ex); this.delegate.onSynchronizeFailed(this, ex, "Failed to create session");
} }
/** /**
@ -276,7 +273,7 @@ implements RecordsChannelDelegate,
if (session == null || if (session == null ||
this.sessionA == session) { this.sessionA == session) {
// TODO: clean up sessionA. // TODO: clean up sessionA.
this.delegate.onSessionError(new UnexpectedSessionException(session)); this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(session), "Failed to create session.");
return; return;
} }
if (this.sessionA == null) { if (this.sessionA == null) {
@ -286,7 +283,7 @@ implements RecordsChannelDelegate,
try { try {
this.sessionA.unbundle(this.bundleA); this.sessionA.unbundle(this.bundleA);
} catch (Exception e) { } catch (Exception e) {
this.delegate.onSessionError(new UnbundleError(e, sessionA)); this.delegate.onSynchronizeFailed(this, new UnbundleError(e, sessionA), "Failed to unbundle first session.");
// TODO: abort // TODO: abort
return; return;
} }
@ -302,8 +299,7 @@ implements RecordsChannelDelegate,
try { try {
this.sessionB.unbundle(this.bundleB); this.sessionB.unbundle(this.bundleB);
} catch (Exception e) { } catch (Exception e) {
// TODO: abort this.delegate.onSynchronizeFailed(this, new UnbundleError(e, sessionA), "Failed to unbundle second session.");
this.delegate.onSessionError(new UnbundleError(e, sessionB));
return; return;
} }
@ -311,7 +307,7 @@ implements RecordsChannelDelegate,
return; return;
} }
// TODO: need a way to make sure we don't call any more delegate methods. // TODO: need a way to make sure we don't call any more delegate methods.
this.delegate.onSessionError(new UnexpectedSessionException(session)); this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(session), "Failed to create session.");
} }
/* /*
@ -354,7 +350,8 @@ implements RecordsChannelDelegate,
this.synchronizer.bundleA = bundle; this.synchronizer.bundleA = bundle;
} else { } else {
// Should not happen! // Should not happen!
this.delegate.onSessionError(new UnexpectedSessionException(sessionA)); this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(sessionA), "Failed to finish first session.");
return;
} }
if (this.sessionB != null) { if (this.sessionB != null) {
Logger.info(LOG_TAG, "Finishing session B."); Logger.info(LOG_TAG, "Finishing session B.");
@ -363,6 +360,7 @@ implements RecordsChannelDelegate,
this.sessionB.finish(this); this.sessionB.finish(this);
} catch (InactiveSessionException e) { } catch (InactiveSessionException e) {
this.onFinishFailed(e); this.onFinishFailed(e);
return;
} }
} }
} else if (session == sessionB) { } else if (session == sessionB) {
@ -374,7 +372,8 @@ implements RecordsChannelDelegate,
this.delegate.onSynchronized(this); this.delegate.onSynchronized(this);
} else { } else {
// Should not happen! // Should not happen!
this.delegate.onSessionError(new UnexpectedSessionException(sessionB)); this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(sessionB), "Failed to finish second session.");
return;
} }
} else { } else {
// TODO: hurrrrrr... // TODO: hurrrrrr...

View File

@ -9,12 +9,5 @@ public interface SynchronizerSessionDelegate {
public void onSynchronized(SynchronizerSession session); public void onSynchronized(SynchronizerSession session);
public void onSynchronizeFailed(SynchronizerSession session, Exception lastException, String reason); public void onSynchronizeFailed(SynchronizerSession session, Exception lastException, String reason);
public void onSynchronizeAborted(SynchronizerSession synchronizerSession);
public void onSynchronizeSkipped(SynchronizerSession synchronizerSession); public void onSynchronizeSkipped(SynchronizerSession synchronizerSession);
// TODO: return value?
public void onFetchError(Exception e);
public void onStoreError(Exception e);
public void onSessionError(Exception e);
} }

View File

@ -136,6 +136,7 @@ sync/repositories/domain/PasswordRecord.java
sync/repositories/domain/Record.java sync/repositories/domain/Record.java
sync/repositories/domain/TabsRecord.java sync/repositories/domain/TabsRecord.java
sync/repositories/domain/VersionConstants.java sync/repositories/domain/VersionConstants.java
sync/repositories/FetchFailedException.java
sync/repositories/HashSetStoreTracker.java sync/repositories/HashSetStoreTracker.java
sync/repositories/HistoryRepository.java sync/repositories/HistoryRepository.java
sync/repositories/IdentityRecordFactory.java sync/repositories/IdentityRecordFactory.java
@ -157,8 +158,11 @@ sync/repositories/RepositorySession.java
sync/repositories/RepositorySessionBundle.java sync/repositories/RepositorySessionBundle.java
sync/repositories/Server11Repository.java sync/repositories/Server11Repository.java
sync/repositories/Server11RepositorySession.java sync/repositories/Server11RepositorySession.java
sync/repositories/StoreFailedException.java
sync/repositories/StoreTracker.java sync/repositories/StoreTracker.java
sync/repositories/StoreTrackingRepositorySession.java sync/repositories/StoreTrackingRepositorySession.java
sync/Server11PreviousPostFailedException.java
sync/Server11RecordPostFailedException.java
sync/setup/activities/AccountActivity.java sync/setup/activities/AccountActivity.java
sync/setup/activities/ActivityUtils.java sync/setup/activities/ActivityUtils.java
sync/setup/activities/SetupFailureActivity.java sync/setup/activities/SetupFailureActivity.java
@ -198,6 +202,8 @@ sync/synchronizer/RecordsChannel.java
sync/synchronizer/RecordsChannelDelegate.java sync/synchronizer/RecordsChannelDelegate.java
sync/synchronizer/RecordsConsumerDelegate.java sync/synchronizer/RecordsConsumerDelegate.java
sync/synchronizer/SerialRecordConsumer.java sync/synchronizer/SerialRecordConsumer.java
sync/synchronizer/ServerLocalSynchronizer.java
sync/synchronizer/ServerLocalSynchronizerSession.java
sync/synchronizer/SessionNotBegunException.java sync/synchronizer/SessionNotBegunException.java
sync/synchronizer/Synchronizer.java sync/synchronizer/Synchronizer.java
sync/synchronizer/SynchronizerDelegate.java sync/synchronizer/SynchronizerDelegate.java