Source code: com/telefonicasoluciones/search/server/HLWorker.java
1 package com.telefonicasoluciones.search.server;
2
3
4
5 import java.io.*;
6
7 import java.net.*;
8
9 import java.util.*;
10
11 import org.apache.lucene.document.*;
12
13 import org.apache.lucene.index.*;
14
15 import org.apache.lucene.store.*;
16
17 import com.telefonicasoluciones.search.server.parser.xml.*;
18
19 import com.telefonicasoluciones.search.server.util.*;
20
21
22
23 public class HLWorker {
24
25 private static int nextWorkerID = 0;
26
27 private int workerID;
28
29 private HLThreadFIFO idleWorkers;
30
31 private HLThreadFIFO handoffBox;
32
33 private Thread internalThread;
34
35 private Socket s_worker;
36
37 private volatile boolean noStopRequested;
38
39 private HLHandler main_hlh;
40
41 private HLHandler hlh;
42
43
44
45 public HLWorker(HLThreadFIFO idleWorkers, HLHandler handler) {
46
47 this.idleWorkers = idleWorkers;
48
49 this.main_hlh = handler;
50
51 /*
52
53 * Identificador.
54
55 */
56
57 workerID = getNextWorkerID();
58
59 /*
60
61 * FIFO con sólo un slot.
62
63 */
64
65 handoffBox = new HLThreadFIFO(1);
66
67 noStopRequested = true;
68
69 Runnable r = new Runnable() {
70
71 public void run() {
72
73 try {
74
75 runWork();
76
77 } catch (Exception _ex) {
78
79 _ex.printStackTrace();
80
81 }
82
83 }
84
85 };
86
87 internalThread = new Thread(r);
88
89 //internalThread.setPriority(Thread.NORM_PRIORITY);
90
91 internalThread.setDaemon(true);
92
93 internalThread.start();
94
95 }
96
97 public static synchronized int getNextWorkerID() {
98
99 /*
100
101 * Sincroniza a nivel de clase para
102
103 * asegurarse que sea único
104
105 */
106
107 int id = nextWorkerID;
108
109 nextWorkerID++;
110
111 return id;
112
113 }
114
115 public void processRequest(Socket s) throws InterruptedException {
116
117 handoffBox.add(s);
118
119 }
120
121 private void runWork() {
122
123 s_worker = null;
124
125 while (noStopRequested) {
126
127 try {
128
129 /*
130
131 * Este worker está listo para
132
133 * atender peticiones y se añade
134
135 * a la cola de workers.
136
137 */
138
139 idleWorkers.add(this);
140
141 /*
142
143 * Espera hatsa que el server ponga
144
145 * un socket dentro del slot handoff.
146
147 */
148
149 s_worker = (Socket) handoffBox.remove();
150
151 BufferedReader isreader = new BufferedReader(new InputStreamReader(s_worker.getInputStream()));
152
153 String command = new String();
154
155 while(true) {
156
157 try {
158
159 command = isreader.readLine().toLowerCase().trim();
160
161 if(processCommand(command)) break;
162
163 } catch (Exception _ex) {
164
165 break;
166
167 }
168
169 }
170
171 isreader.close();
172
173 isreader = null;
174
175 command = null;
176
177 } catch (IOException _ex) {
178
179 System.err.println("Artlight worker i/o error: "+_ex.getMessage());
180
181 } catch (InterruptedException _ex) {
182
183 Thread.currentThread().interrupt();
184
185 } finally {
186
187 if (hlh!=null) {
188
189 hlh = null;
190
191 }
192
193 if (s_worker!=null) {
194
195 try {
196
197 s_worker.close();
198
199 } catch (IOException _ex) {
200
201 // Ignorada
202
203 } finally {
204
205 s_worker = null;
206
207 }
208
209 }
210
211 }
212
213 }
214
215 }
216
217 private boolean processCommand(String command) throws Exception {
218
219 if(!(command.length()>0)) return false;
220
221 boolean active = (hlh!=null);
222
223 StringBuffer reply = new StringBuffer();
224
225 Object[] tokens = getTokens(command);
226
227
228
229 TagToken cmd = new TagToken((String) tokens[0]);
230
231
232
233 /*
234
235 * Protocol 2.0 commands
236
237 */
238
239 if(!cmd.isEndTag()&&cmd.getName().equals("client:session")) {
240
241 if(cmd.getAttribute("instance")!=null) {
242
243 try {
244
245 hlh = main_hlh.getInstance(cmd.getAttribute("instance"));
246
247 reply.append("<server:session host=\"");
248
249 reply.append(s_worker.getLocalAddress().getHostName());
250
251 reply.append("\">");
252
253 hlh.writeLog(HLHandler.CONNECTION_LOG,"server session ip=\""+s_worker.getInetAddress().getHostAddress()+"\" date=\""+new Date(System.currentTimeMillis()).toString()+"\"");
254
255 writeResponse(reply.toString());
256
257 return false;
258
259 } catch (NullPointerException _ex) {
260
261 reply.append("<sever:error type=\"command\" name=\"session\" id=\"301\">");
262
263 reply.append("Nullpointer");
264
265 reply.append("</server:error>");
266
267 writeResponse(reply.toString());
268
269 throw new Exception("NullPointer");
270
271 } catch (Exception _ex) {
272
273 reply.append("<sever:error type=\"command\" name=\"session\" id=\"302\">");
274
275 reply.append(_ex.getMessage());
276
277 reply.append("</server:error>");
278
279 writeResponse(reply.toString());
280
281 throw _ex;
282
283 }
284
285 } else {
286
287 reply.append("<sever:error type=\"command\" name=\"session\" id=\"305\">");
288
289 reply.append("Syntax error");
290
291 reply.append("</server:error>");
292
293 writeResponse(reply.toString());
294
295 throw new Exception("Syntax error");
296
297 }
298
299 } else if(active&&cmd.isEndTag()&&cmd.getName().equals("client:session")) {
300
301 reply.append("</server:session>");
302
303 writeResponse(reply.toString());
304
305 return true;
306
307 } else if(active&&cmd.getName().equals("description")) {
308
309 reply.append("<server:description version=\"1.4\" name=\"");
310
311 reply.append(s_worker.getLocalAddress().getHostName());
312
313 reply.append("\" protocol=\"2.0\"/>");
314
315 writeResponse(reply.toString());
316
317 return false;
318
319 } else if (active&&cmd.getName().equals("index")) {
320
321 try {
322
323 if(tokens.length>1) {
324
325 TagToken endTag = new TagToken((String) tokens[2]);
326
327 if(!endTag.isEndTag()) throw new Exception("Syntax error");
328
329 } else {
330
331 throw new Exception("Syntax error");
332
333 }
334
335 URL u = new URL((String) tokens[1]);
336
337 try {
338
339 HLIndex index = new HLIndex(s_worker, hlh);
340
341 if(cmd.getAttribute("overwrite")!=null&&cmd.getAttribute("overwrite").equals("true")) {
342
343 index.deleteOldDocuments(true);
344
345 }
346
347 if(cmd.getAttribute("recursive")!=null&&cmd.getAttribute("recursive").equals("true")) {
348
349 index.setRecursive(true);
350
351 }
352
353 index.index(u);
354
355 return false;
356
357 } catch (NullPointerException _ex) {
358
359 reply.append("<sever:error type=\"command\" name=\"index\" id=\"301\">");
360
361 reply.append("Nullpointer");
362
363 reply.append("</server:error>");
364
365 writeResponse(reply.toString());
366
367 throw new Exception("NullPointer");
368
369 } catch (Exception _ex) {
370
371 reply.append("<sever:error type=\"command\" name=\"index\" id=\"302\">");
372
373 reply.append(_ex.getMessage());
374
375 reply.append("</server:error>");
376
377 hlh.writeLog(HLHandler.ERROR_LOG,"302 name=\"index\" "+_ex.getMessage());
378
379 writeResponse(reply.toString());
380
381 throw _ex;
382
383 }
384
385 } catch(Exception _ex) {
386
387 reply.append("<sever:error type=\"command\" name=\"index\" id=\"305\">");
388
389 reply.append("Syntax error");
390
391 reply.append("</server:error>");
392
393 writeResponse(reply.toString());
394
395 throw new Exception("Syntax error");
396
397 }
398
399 } else if(active&&cmd.getName().equals("find")) {
400
401 try {
402
403 if(tokens.length>1) {
404
405 TagToken endTag = new TagToken((String) tokens[2]);
406
407 if(!endTag.isEndTag()) throw new Exception("Syntax error");
408
409 } else {
410
411 throw new Exception("Syntax error");
412
413 }
414
415 if(cmd.getAttribute("page")==null||cmd.getAttribute("lines")==null) {
416
417 throw new Exception("Syntax error");
418
419 }
420
421 try {
422
423 String query = (String) tokens[1];
424
425 int pagenumber = Integer.parseInt(cmd.getAttribute("page"));
426
427 int lines = Integer.parseInt(cmd.getAttribute("lines"));
428
429 /*
430
431 * Se parsea la query, identificando
432
433 * términos particulares.
434
435 */
436
437 HashMap fields = null;
438
439 if(query.indexOf(":")!=-1) {
440
441 fields = new HashMap();
442
443 int offset = 0;
444
445 while(true) {
446
447 String token;
448
449 if(query.indexOf(" ",offset)!=-1) {
450
451 token = query.substring(offset,query.indexOf(" ",offset));
452
453 } else {
454
455 token = query.substring(offset,query.length());
456
457 }
458
459 if(token.indexOf(":")!=-1) {
460
461 fields.put(token.substring(0,token.indexOf(":")),token.substring(token.indexOf(":")+1,token.length()));
462
463 } else {
464
465 if(fields.containsKey("contents")) {
466
467 fields.put("contents",((String)fields.get("contents"))+" "+token);
468
469 } else {
470
471 fields.put("contents",token);
472
473 }
474
475 }
476
477 offset = query.indexOf(" ",offset)+1;
478
479 if(offset<1) break;
480
481 }
482
483 }
484
485
486
487 /*
488
489 * Se realiza la búsqueda, utilizando
490
491 * el conjunto de términos
492
493 */
494
495 HLSearch search = new HLSearch(hlh);
496
497 if(fields==null) {
498
499 search.search("contents",query);
500
501 } else {
502
503 search.search(fields);
504
505 }
506
507 int total = search.getTotalMatches();
508
509 ArrayList totalResults = search.getResults();
510
511 int cursor = pagenumber*lines;
512
513 lines = cursor+lines;
514
515 ArrayList results = new ArrayList();
516
517 while(cursor<lines&&cursor<total) {
518
519 results.add(totalResults.get(cursor));
520
521 cursor++;
522
523 }
524
525 ByteArrayOutputStream baos = new ByteArrayOutputStream();
526
527 ObjectOutputStream oos = new ObjectOutputStream(baos);
528
529 oos.writeObject(results);
530
531 writeResponse("<find type=\"results\" total=\""+total+"\">");
532
533 writeResponse(baos.toByteArray());
534
535 writeResponse("</find>");
536
537 baos.close();
538
539 return false;
540
541 } catch (NumberFormatException _ex) {
542
543 reply.append("<sever:error type=\"command\" name=\"find\" id=\"305\">");
544
545 reply.append("Syntax error");
546
547 reply.append("</server:error>");
548
549 writeResponse(reply.toString());
550
551 throw new Exception("Syntax error");
552
553 } catch (Exception _ex) {
554
555 reply.append("<sever:error type=\"command\" name=\"find\" id=\"302\">");
556
557 reply.append(_ex.getMessage());
558
559 reply.append("</server:error>");
560
561 hlh.writeLog(HLHandler.ERROR_LOG,"302 name=\"find\" "+_ex.getMessage());
562
563 writeResponse(reply.toString());
564
565 throw _ex;
566
567 }
568
569 } catch(Exception _ex) {
570
571 reply.append("<sever:error type=\"command\" name=\"find\" id=\"305\">");
572
573 reply.append("Syntax error");
574
575 reply.append("</server:error>");
576
577 writeResponse(reply.toString());
578
579 throw _ex;
580
581 }
582
583 } else if (active&&cmd.getName().equals("deleteall")) {
584
585 try {
586
587 IndexReader reader = IndexReader.open(FSDirectory.getDirectory(hlh.getIndexDirectory().getCanonicalPath(), false));
588
589 for(int i = reader.maxDoc(); --i >= 0; )
590
591 reader.delete(i);
592
593 reader.close();
594
595 reply.append("<server:delete status=\"finished\"/>");
596
597 hlh.writeLog(HLHandler.INDEX_LOG,"Index deleted");
598
599 writeResponse(reply.toString());
600
601 return false;
602
603 } catch (NullPointerException _ex) {
604
605 reply.append("<sever:error type=\"command\" name=\"deleteall\" id=\"301\">");
606
607 reply.append(_ex.getMessage());
608
609 reply.append("</server:error>");
610
611 writeResponse(reply.toString());
612
613 throw new Exception("NullPointer");
614
615 } catch(IOException _ex) {
616
617 reply.append("<sever:error type=\"command\" name=\"deleteall\" id=\"307\">");
618
619 reply.append(_ex.getMessage());
620
621 reply.append("</server:error>");
622
623 hlh.writeLog(HLHandler.ERROR_LOG,"302 name=\"deleteall\" "+_ex.getMessage());
624
625 writeResponse(reply.toString());
626
627 throw _ex;
628
629 }
630
631 } else {
632
633 reply.append("<server:error type=\"protocol\">");
634
635 reply.append("Command ");
636
637 reply.append(cmd.getName());
638
639 reply.append(" not implemented");
640
641 reply.append("</server:error>");
642
643 writeResponse(reply.toString());
644
645 return true;
646
647 }
648
649 }
650
651 private void writeResponse(String text) throws IOException {
652
653 if(s_worker!=null) {
654
655 s_worker.getOutputStream().write((text+"\r\n").getBytes());
656
657 }
658
659 }
660
661 private void writeResponse(byte[] data) throws IOException {
662
663 if(s_worker!=null) {
664
665 s_worker.getOutputStream().write(data);
666
667 }
668
669 }
670
671 public Object[] getTokens(String line) {
672
673 if(line==null||line.length()<=0) return null;
674
675 ArrayList tokens = new ArrayList();
676
677 String separator = new String(">");
678
679 int start = 0;
680
681 int end = 0;
682
683 try {
684
685 while(true) {
686
687 end = line.indexOf(separator,start);
688
689 if(end==-1) break;
690
691 if(separator.equals("<")) {
692
693 separator = ">";
694
695 } else {
696
697 separator = "<";
698
699 end++;
700
701 }
702
703 tokens.add(line.substring(start,end));
704
705 start = end;
706
707 }
708
709 return tokens.toArray();
710
711 } catch (Exception e) {
712
713 return null;
714
715 }
716
717 }
718
719 public void stopRequest() {
720
721 noStopRequested = false;
722
723 internalThread.interrupt();
724
725 }
726
727 public boolean isAlive() {
728
729 return internalThread.isAlive();
730
731 }
732
733 }
734