Ticket #340: batch_processing.diff

File batch_processing.diff, 8.2 KB (added by francois.lagunas@…, 11 months ago)

Patch for batch processing

  • ruby/test/unit/index/tc_index.rb

     
    462462    index.close 
    463463  end 
    464464 
     465  def test_index_key_batch0 
     466    data = { 
     467      "0" => {:id => "0", :val => "one"}, 
     468      "0" => {:id => "0", :val => "two"}, 
     469      "1" =>{:id => "1", :val => "three"}, 
     470      "1" => {:id => "1", :val => "four"}, 
     471    } 
     472 
     473    index = Index.new(:analyzer => WhiteSpaceAnalyzer.new, 
     474                      :key => :id) 
     475    index.batch_update data 
     476    assert_equal(2, index.size) 
     477    index.close 
     478  end 
     479 
     480  def test_index_key_batch1 
     481    data0 = { 
     482      "0" => {:id => "0", :val => "one"}, 
     483      "0" => {:id => "0", :val => "two"}, 
     484      "1" =>{:id => "1", :val => "three"}, 
     485      "2" => {:id => "1", :val => "four"}, 
     486    } 
     487 
     488    data1 = { 
     489      "0" => {:id => "0", :val => "one"}, 
     490      "3" => {:id => "3", :val => "two"}, 
     491      "2" =>{:id => "2", :val => "three"}, 
     492      "1" => {:id => "1", :val => "four"}, 
     493      "4" => {:id => "4", :val => "four"}, 
     494    } 
     495 
     496    index = Index.new(:analyzer => WhiteSpaceAnalyzer.new, 
     497                      :key => :id) 
     498    index.batch_update data0 
     499    assert_equal(3, index.size) 
     500    index.batch_update data1 
     501    assert_equal(5, index.size) 
     502    index.close 
     503  end 
     504 
     505  def test_index_key_delete_batch0 
     506    data0 = { 
     507      "0" => {:id => "0", :val => "one"}, 
     508      "0" => {:id => "0", :val => "two"}, 
     509      "1" =>{:id => "1", :val => "three"}, 
     510      "2" => {:id => "2", :val => "four"}, 
     511      "0" => {:id => "0", :val => "four"}, 
     512    } 
     513 
     514    data1 = ["0", "1"]; 
     515 
     516    index = Index.new(:analyzer => WhiteSpaceAnalyzer.new, :key => :id) 
     517    index.batch_update data0 
     518 
     519    assert_equal("four", index["0"][:val]) 
     520    assert_equal("three", index["1"][:val]) 
     521    assert_equal("four", index["2"][:val]) 
     522 
     523    assert_equal(3, index.size) 
     524    index.batch_delete data1 
     525    assert_equal(1, index.size) 
     526    assert_equal("four", index["2"][:val]) 
     527 
     528    index.close 
     529  end 
     530 
    465531  def test_index_multi_key 
    466532    index = Index.new(:analyzer => WhiteSpaceAnalyzer.new, 
    467533                      :key => [:id, :table]) 
  • ruby/ext/r_index.c

     
    16271627} 
    16281628 
    16291629/* 
     1630 * Same as frt_iw_delete, but +rterms+ is a vector of terms. 
     1631 */ 
     1632static VALUE 
     1633frt_iw_batch_delete(VALUE self, VALUE rfield, VALUE rterms) 
     1634{ 
     1635    IndexWriter *iw = (IndexWriter *) DATA_PTR(self); 
     1636    iw_batch_delete_term(iw, frt_field(rfield), rterms); 
     1637    return self; 
     1638} 
     1639 
     1640/* 
    16301641 *  call-seq: 
    16311642 *     index_writer.field_infos -> FieldInfos 
    16321643 * 
     
    32983309    rb_define_method(cIndexWriter, "commit",        frt_iw_commit, 0); 
    32993310    rb_define_method(cIndexWriter, "add_readers",   frt_iw_add_readers, 1); 
    33003311    rb_define_method(cIndexWriter, "delete",        frt_iw_delete, 2); 
     3312    rb_define_method(cIndexWriter, "batch_delete",  frt_iw_batch_delete, 2); 
    33013313    rb_define_method(cIndexWriter, "field_infos",   frt_iw_field_infos, 0); 
    33023314    rb_define_method(cIndexWriter, "analyzer",      frt_iw_get_analyzer, 0); 
    33033315    rb_define_method(cIndexWriter, "analyzer=",     frt_iw_set_analyzer, 1); 
  • ruby/lib/ferret/index.rb

     
    451451      return self 
    452452    end 
    453453 
     454    # helper function for batch_delete 
     455    def batch_delete_one(id, remaining)  
     456      if id.is_a?(String) or id.is_a?(Symbol)  
     457        remaining << id.to_s 
     458      elsif id.is_a?(Integer) 
     459        ensure_reader_open() 
     460        cnt = @reader.delete(id) 
     461      else 
     462        raise ArgumentError, "Cannot delete for arg of type #{id.class}" 
     463      end 
     464    end 
     465     
     466    # unlocked version of batch_delete 
     467    def batch_delete_unlocked(docs) 
     468      remaining = [] 
     469      if docs.is_a?(Hash) 
     470        docs.each do |id, docs|           
     471          batch_delete_one(id, remaining) 
     472        end 
     473      elsif docs.is_a?(Array) 
     474        docs.each do |id|           
     475          batch_delete_one(id, remaining) 
     476        end 
     477      else 
     478        raise ArgumentError, "Cannot batch delete for arg of type #{docs.class}" 
     479      end 
     480      if(@reader)  
     481        @reader.commit 
     482      end 
     483      ensure_writer_open() 
     484      if remaining.length != 0 
     485        @writer.batch_delete(@id_field, remaining) 
     486      end 
     487      @writer.commit 
     488      return self 
     489    end 
     490     
     491    # Deletes documents from the index.  
     492    # +docs+ contains ids, which are processed according to their type. 
     493    # see delete(arg) for more precision 
     494    # docs:: An array of docs to be deleted, or a hash (key is then only used) 
     495    def batch_delete(docs) 
     496      @dir.synchrolock do 
     497        batch_delete_unlocked(docs) 
     498      end 
     499      return self 
     500    end 
     501 
    454502    # Delete all documents returned by the query. 
    455503    #  
    456504    # query:: The query to find documents you wish to delete. Can either be a 
     
    479527    # Update the document referenced by the document number +id+ if +id+ is an 
    480528    # integer or all of the documents which have the term +id+ if +id+ is a 
    481529    # term.. 
     530    # For batch update of set of documents, for performance reasons, see batch_update 
    482531    # 
    483532    # id::      The number of the document to update. Can also be a string 
    484533    #           representing the value in the +id+ field. Also consider using 
     
    498547      end 
    499548    end 
    500549 
     550    # Update the documents referenced by the documents contained in +docs+  
     551    # Each entry in +ids+ is a id/doc pair 
     552    # See update(id, new_doc) for more precision on the way id is processed 
     553    # according to its type 
     554    # docs::    The set of documents to be updated 
     555    def batch_update(docs) 
     556      @dir.synchrolock do 
     557        batch_delete_unlocked(docs) 
     558        ensure_writer_open() 
     559        docs.each do |id, new_doc|  
     560          @writer << new_doc 
     561        end 
     562        flush() 
     563      end 
     564    end 
     565 
     566 
    501567    # Update all the documents returned by the query. 
    502568    # 
    503569    # query::   The query to find documents you wish to update. Can either be 
  • c/include/index.h

     
    924924                            const Config *config); 
    925925extern void iw_delete_term(IndexWriter *iw, const char *field, 
    926926                           const char *term); 
     927extern void iw_batch_delete_term(IndexWriter *iw, const char *field,  
     928                                 VALUE terms); 
    927929extern void iw_close(IndexWriter *iw); 
    928930extern void iw_add_doc(IndexWriter *iw, Document *doc); 
    929931extern int iw_doc_count(IndexWriter *iw); 
  • c/src/index.c

     
    60496049    } 
    60506050} 
    60516051 
     6052void iw_batch_delete_term(IndexWriter *iw, const char *field, VALUE terms) 
     6053{ 
     6054    int field_num = fis_get_field_num(iw->fis, field); 
     6055    if (field_num >= 0) { 
     6056        int i; 
     6057        mutex_lock(&iw->mutex); 
     6058        iw_commit_i(iw); 
     6059        do { 
     6060            SegmentInfos *sis = iw->sis; 
     6061            const int seg_cnt = sis->size; 
     6062            bool did_delete = false; 
     6063            for (i = 0; i < seg_cnt; i++) { 
     6064                IndexReader *ir = sr_open(sis, iw->fis, i, false); 
     6065                unsigned int i; 
     6066                for (i = 0 ; i < RARRAY(terms)->len ; i++) { 
     6067                    TermDocEnum *tde = ir->term_docs(ir); 
     6068                    VALUE rterm = RARRAY(terms)->ptr[i]; 
     6069                    const char* term = StringValuePtr(rterm); 
     6070                    ir->deleter = iw->deleter; 
     6071                    stde_seek(tde, field_num, term); 
     6072                    while (tde->next(tde)) { 
     6073                        did_delete = true; 
     6074                        sr_delete_doc_i(ir, STDE(tde)->doc_num); 
     6075                    } 
     6076                    tde_destroy(tde); 
     6077                } 
     6078                sr_commit_i(ir); 
     6079                ir_close(ir); 
     6080            } 
     6081            if (did_delete) { 
     6082                mutex_lock(&iw->store->mutex); 
     6083                sis_write(iw->sis, iw->store, iw->deleter); 
     6084                mutex_unlock(&iw->store->mutex); 
     6085            } 
     6086        } while (0); 
     6087        mutex_unlock(&iw->mutex); 
     6088    } 
     6089} 
     6090 
    60526091static void iw_optimize_i(IndexWriter *iw) 
    60536092{ 
    60546093    int min_segment;