| Method from org.apache.lucene.index.ConcurrentMergeScheduler Detail: |
public static boolean anyUnhandledExceptions() {
synchronized(allInstances) {
final int count = allInstances.size();
// Make sure all outstanding threads are done so we see
// any exceptions they may produce:
for(int i=0;i< count;i++)
((ConcurrentMergeScheduler) allInstances.get(i)).sync();
boolean v = anyExceptions;
anyExceptions = false;
return v;
}
}
|
void clearSuppressExceptions() {
suppressExceptions = false;
}
|
public static void clearUnhandledExceptions() {
synchronized(allInstances) {
anyExceptions = false;
}
}
|
public void close() {
closed = true;
}
|
protected void doMerge(MergePolicy.OneMerge merge) throws IOException {
writer.merge(merge);
}
|
public int getMaxThreadCount() {
return maxThreadCount;
}
Get the max # simultaneous threads that may be
running. @see #setMaxThreadCount. |
protected synchronized ConcurrentMergeScheduler.MergeThread getMergeThread(IndexWriter writer,
MergePolicy.OneMerge merge) throws IOException {
final MergeThread thread = new MergeThread(writer, merge);
thread.setThreadPriority(mergeThreadPriority);
thread.setDaemon(true);
thread.setName("Lucene Merge Thread #" + mergeThreadCount++);
return thread;
}
Create and return a new MergeThread |
public synchronized int getMergeThreadPriority() {
initMergeThreadPriority();
return mergeThreadPriority;
}
Return the priority that merge threads run at. By
default the priority is 1 plus the priority of (ie,
slightly higher priority than) the first thread that
calls merge. |
protected void handleMergeException(Throwable exc) {
throw new MergePolicy.MergeException(exc, dir);
}
Called when an exception is hit in a background merge
thread |
public void merge(IndexWriter writer) throws IOException, CorruptIndexException {
// TODO: enable this once we are on JRE 1.5
// assert !Thread.holdsLock(writer);
this.writer = writer;
initMergeThreadPriority();
dir = writer.getDirectory();
// First, quickly run through the newly proposed merges
// and add any orthogonal merges (ie a merge not
// involving segments already pending to be merged) to
// the queue. If we are way behind on merging, many of
// these newly proposed merges will likely already be
// registered.
message("now merge");
message(" index: " + writer.segString());
// Iterate, pulling from the IndexWriter's queue of
// pending merges, until it's empty:
while(true) {
// TODO: we could be careful about which merges to do in
// the BG (eg maybe the "biggest" ones) vs FG, which
// merges to do first (the easiest ones?), etc.
MergePolicy.OneMerge merge = writer.getNextMerge();
if (merge == null) {
message(" no more merges pending; now return");
return;
}
// We do this w/ the primary thread to keep
// deterministic assignment of segment names
writer.mergeInit(merge);
synchronized(this) {
while (mergeThreadCount() >= maxThreadCount) {
message(" too many merge threads running; stalling...");
try {
wait();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
message(" consider merge " + merge.segString(dir));
assert mergeThreadCount() < maxThreadCount;
// OK to spawn a new merge thread to handle this
// merge:
final MergeThread merger = getMergeThread(writer, merge);
mergeThreads.add(merger);
message(" launch new thread [" + merger.getName() + "]");
merger.start();
}
}
}
|
public void setMaxThreadCount(int count) {
if (count < 1)
throw new IllegalArgumentException("count should be at least 1");
maxThreadCount = count;
}
Sets the max # simultaneous threads that may be
running. If a merge is necessary yet we already have
this many threads running, the incoming thread (that
is calling add/updateDocument) will block until
a merge thread has completed. |
public synchronized void setMergeThreadPriority(int pri) {
if (pri > Thread.MAX_PRIORITY || pri < Thread.MIN_PRIORITY)
throw new IllegalArgumentException("priority must be in range " + Thread.MIN_PRIORITY + " .. " + Thread.MAX_PRIORITY + " inclusive");
mergeThreadPriority = pri;
final int numThreads = mergeThreadCount();
for(int i=0;i< numThreads;i++) {
MergeThread merge = (MergeThread) mergeThreads.get(i);
merge.setThreadPriority(pri);
}
}
Return the priority that merge threads run at. |
void setSuppressExceptions() {
suppressExceptions = true;
}
|
public static void setTestMode() {
allInstances = new ArrayList();
}
|
public synchronized void sync() {
while(mergeThreadCount() > 0) {
message("now wait for threads; currently " + mergeThreads.size() + " still running");
final int count = mergeThreads.size();
for(int i=0;i< count;i++)
message(" " + i + ": " + ((MergeThread) mergeThreads.get(i)));
try {
wait();
} catch (InterruptedException e) {
}
}
}
|