2012-05-21 04:12:37 -07:00
|
|
|
/* This Source Code Form is subject to the terms of the Mozilla Public
|
|
|
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
|
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
2012-01-14 09:20:31 -08:00
|
|
|
|
|
|
|
package org.mozilla.gecko.sync.synchronizer;
|
|
|
|
|
2012-02-15 22:05:52 -08:00
|
|
|
import org.mozilla.gecko.sync.Logger;
|
2012-01-14 09:20:31 -08:00
|
|
|
import org.mozilla.gecko.sync.repositories.domain.Record;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Consume records from a queue inside a RecordsChannel, as fast as we can.
|
|
|
|
* TODO: rewrite this in terms of an ExecutorService and a CompletionService.
|
|
|
|
* See Bug 713483.
|
|
|
|
*
|
|
|
|
* @author rnewman
|
|
|
|
*
|
|
|
|
*/
|
|
|
|
class ConcurrentRecordConsumer extends RecordConsumer {
|
2012-02-15 22:05:52 -08:00
|
|
|
private static final String LOG_TAG = "CRecordConsumer";
|
2012-01-14 09:20:31 -08:00
|
|
|
|
|
|
|
/**
|
|
|
|
* When this is true and all records have been processed, the consumer
|
|
|
|
* will notify its delegate.
|
|
|
|
*/
|
|
|
|
protected boolean allRecordsQueued = false;
|
|
|
|
private long counter = 0;
|
|
|
|
|
|
|
|
public ConcurrentRecordConsumer(RecordsConsumerDelegate delegate) {
|
|
|
|
this.delegate = delegate;
|
|
|
|
}
|
|
|
|
|
|
|
|
private Object monitor = new Object();
|
|
|
|
@Override
|
|
|
|
public void doNotify() {
|
|
|
|
synchronized (monitor) {
|
|
|
|
monitor.notify();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void queueFilled() {
|
2012-07-06 13:01:10 -07:00
|
|
|
Logger.debug(LOG_TAG, "Queue filled.");
|
2012-01-14 09:20:31 -08:00
|
|
|
synchronized (monitor) {
|
|
|
|
this.allRecordsQueued = true;
|
|
|
|
monitor.notify();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void halt() {
|
|
|
|
synchronized (monitor) {
|
|
|
|
this.stopImmediately = true;
|
|
|
|
monitor.notify();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private Object countMonitor = new Object();
|
|
|
|
@Override
|
|
|
|
public void stored() {
|
2012-07-06 13:01:10 -07:00
|
|
|
Logger.trace(LOG_TAG, "Record stored. Notifying.");
|
2012-01-14 09:20:31 -08:00
|
|
|
synchronized (countMonitor) {
|
|
|
|
counter++;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private void consumerIsDone() {
|
2012-07-06 13:01:10 -07:00
|
|
|
Logger.debug(LOG_TAG, "Consumer is done. Processed " + counter + ((counter == 1) ? " record." : " records."));
|
2012-01-14 09:20:31 -08:00
|
|
|
delegate.consumerIsDone(!allRecordsQueued);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void run() {
|
2012-05-31 12:21:08 -07:00
|
|
|
Record record;
|
|
|
|
|
2012-01-14 09:20:31 -08:00
|
|
|
while (true) {
|
|
|
|
// The queue is concurrent-safe.
|
2012-05-31 12:21:08 -07:00
|
|
|
while ((record = delegate.getQueue().poll()) != null) {
|
|
|
|
synchronized (monitor) {
|
2012-07-06 13:01:10 -07:00
|
|
|
Logger.trace(LOG_TAG, "run() took monitor.");
|
2012-05-31 12:21:08 -07:00
|
|
|
if (stopImmediately) {
|
2012-07-06 13:01:10 -07:00
|
|
|
Logger.debug(LOG_TAG, "Stopping immediately. Clearing queue.");
|
2012-05-31 12:21:08 -07:00
|
|
|
delegate.getQueue().clear();
|
2012-07-06 13:01:10 -07:00
|
|
|
Logger.debug(LOG_TAG, "Notifying consumer.");
|
2012-05-31 12:21:08 -07:00
|
|
|
consumerIsDone();
|
|
|
|
return;
|
|
|
|
}
|
2012-07-06 13:01:10 -07:00
|
|
|
Logger.debug(LOG_TAG, "run() dropped monitor.");
|
2012-05-31 12:21:08 -07:00
|
|
|
}
|
|
|
|
|
2012-07-06 13:01:10 -07:00
|
|
|
Logger.trace(LOG_TAG, "Storing record with guid " + record.guid + ".");
|
2012-02-03 13:09:29 -08:00
|
|
|
try {
|
|
|
|
delegate.store(record);
|
|
|
|
} catch (Exception e) {
|
|
|
|
// TODO: Bug 709371: track records that failed to apply.
|
2012-06-12 12:12:43 -07:00
|
|
|
Logger.error(LOG_TAG, "Caught error in store.", e);
|
2012-02-03 13:09:29 -08:00
|
|
|
}
|
2012-07-06 13:01:10 -07:00
|
|
|
Logger.trace(LOG_TAG, "Done with record.");
|
2012-01-14 09:20:31 -08:00
|
|
|
}
|
|
|
|
synchronized (monitor) {
|
2012-07-06 13:01:10 -07:00
|
|
|
Logger.trace(LOG_TAG, "run() took monitor.");
|
2012-01-14 09:20:31 -08:00
|
|
|
|
|
|
|
if (allRecordsQueued) {
|
2012-07-06 13:01:10 -07:00
|
|
|
Logger.debug(LOG_TAG, "Done with records and no more to come. Notifying consumerIsDone.");
|
2012-01-14 09:20:31 -08:00
|
|
|
consumerIsDone();
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
if (stopImmediately) {
|
2012-07-06 13:01:10 -07:00
|
|
|
Logger.debug(LOG_TAG, "Done with records and told to stop immediately. Notifying consumerIsDone.");
|
2012-01-14 09:20:31 -08:00
|
|
|
consumerIsDone();
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
try {
|
2012-07-06 13:01:10 -07:00
|
|
|
Logger.debug(LOG_TAG, "Not told to stop but no records. Waiting.");
|
2012-01-14 09:20:31 -08:00
|
|
|
monitor.wait(10000);
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
// TODO
|
|
|
|
}
|
2012-07-06 13:01:10 -07:00
|
|
|
Logger.trace(LOG_TAG, "run() dropped monitor.");
|
2012-01-14 09:20:31 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|