Last active
September 26, 2016 18:59
-
-
Save jasonmimick/128ecdc93247ea884421f1f59753c307 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// fts_command.h | |
/** | |
* Copyright (C) 2012 10gen Inc. | |
* | |
* This program is free software: you can redistribute it and/or modify | |
* it under the terms of the GNU Affero General Public License, version 3, | |
* as published by the Free Software Foundation. | |
* | |
* This program is distributed in the hope that it will be useful, | |
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
* GNU Affero General Public License for more details. | |
* | |
* You should have received a copy of the GNU Affero General Public License | |
* along with this program. If not, see <http://www.gnu.org/licenses/>. | |
* | |
* As a special exception, the copyright holders give permission to link the | |
* code of portions of this program with the OpenSSL library under certain | |
* conditions as described in each individual source file and distribute | |
* linked combinations including the program with the OpenSSL library. You | |
* must comply with the GNU Affero General Public License in all respects for | |
* all of the code used other than as permitted herein. If you modify file(s) | |
* with this exception, you may extend this exception to your version of the | |
* file(s), but you are not obligated to do so. If you do not wish to do so, | |
* delete this exception statement from your version. If you delete this | |
* exception statement from all source files in the program, then also delete | |
* it in the license file. | |
*/ | |
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand | |
#include <string> | |
#include <vector> | |
#include <set> | |
#include <list> | |
#include "mongo/base/string_data.h" | |
#include "mongo/db/commands.h" | |
#include "mongo/db/fts/fts_util.h" | |
#include "mongo/util/mongoutils/str.h" | |
#include "mongo/util/timer.h" | |
#include "mongo/db/catalog/database.h" | |
#include "mongo/db/ops/insert.h" | |
#include "mongo/db/catalog/database_holder.h" | |
#include "mongo/db/index/fts_access_method.h" | |
#include "mongo/db/db.h" | |
#include "mongo/db/db_raii.h" | |
#include "mongo/db/dbhelpers.h" | |
#include "mongo/db/catalog/index_create.h" | |
#include "mongo/util/log.h" | |
#include "mongo/db/concurrency/write_conflict_exception.h" | |
#include "mongo/db/repl/replication_coordinator_global.h" | |
#include "mongo/db/repl/repl_client_info.h" | |
#include "mongo/db/namespace_string.h" | |
#include "mongo/db/dbdirectclient.h" | |
#include "mongo/util/scopeguard.h" | |
namespace mongo { | |
namespace fts { | |
using namespace mongoutils; | |
using std::string; | |
using std::stringstream; | |
using std::vector; | |
using std::set; | |
/* select textTerms(*) */ | |
class CmdTextTermBuild2 : public Command { | |
public: | |
OperationContext* _txn; | |
virtual bool isWriteCommandForConfigServer() const { return false; } | |
CmdTextTermBuild2() : Command("textTermBuild2") { } | |
virtual bool logTheOp() { return true; } | |
virtual bool slaveOk() const { return false; } | |
virtual bool slaveOverrideOk() const { return true; } | |
virtual bool maintenanceOk() const { return false; } | |
virtual bool adminOnly() const { return false; } | |
virtual void help(stringstream& help) const { help << "text terms in collection"; } | |
virtual void addRequiredPrivileges(const std::string& dbname, | |
const BSONObj& cmdObj, | |
std::vector<Privilege>* out) { | |
ActionSet actions; | |
actions.addAction(ActionType::find); | |
out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); | |
} | |
bool InsertTerms(const std::string& dbName, Collection* collTerm, NamespaceString nsTerms, const FTSAccessMethod* fam, bool isForward) | |
{ | |
log() << "InsertTerms called on textTermBuild2"; | |
std::unique_ptr<ScopedTransaction> scopedXact(new ScopedTransaction(_txn, MODE_IX)); | |
Lock::CollectionLock colLock(_txn->lockState(), nsTerms.ns(), MODE_IX); | |
std::unique_ptr<SortedDataInterface::Cursor> cursor = fam->newCursor(_txn, isForward); | |
boost::optional<IndexKeyEntry> kv = cursor->seek(BSONObj(), true, SortedDataInterface::Cursor::kWantKey); | |
bool bReturn = false; | |
//std::unique_ptr<Lock::GlobalWrite> globalWriteLock(new Lock::GlobalWrite(_txn->lockState())); | |
int nCount = 0; | |
int nIndex = 0; | |
time_t lastLog(0); | |
set<string > setTerms; | |
while (kv) | |
{ | |
if (nCount % 128 == 127) { | |
time_t now = time(0); | |
if (now - lastLog >= 60) { | |
// report progress | |
if (lastLog) | |
log() << "textTermBuild2 InsertTerms " << dbName << ' ' << nCount << std::endl; | |
lastLog = now; | |
} | |
_txn->checkForInterrupt(); | |
scopedXact.reset(); | |
//colLock.reset(); | |
CurOp::get(_txn)->yielded(); | |
scopedXact.reset(new ScopedTransaction(_txn, MODE_IX)); | |
//colLock.reset(new Lock::GlobalWrite(_txn->lockState())); | |
// Check if everything is still all right. | |
/* | |
if (_txn->writesAreReplicated()) { | |
uassert( | |
100001, | |
str::stream() << "Cannot write to ns: " << collTerm | |
<< " after yielding", | |
repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(collTerm)); | |
} | |
*/ | |
// TODO: SERVER-16598 abort if original db or collection is gone. | |
/* | |
Database* db = dbHolder().get(_txn, dbName); | |
uassert(100002, | |
str::stream() << "Database " << dbName << " dropped while cloning", | |
db != NULL); | |
Collection* collection = NULL; | |
collection = db->getCollection(collTerm->toString()); | |
uassert(100003, | |
str::stream() << "Collection " << collTerm | |
<< " dropped while cloning", | |
collection != NULL); | |
*/ | |
} | |
BSONObj obj = kv->key; | |
BSONObjIterator keyDataIt(obj); | |
if (keyDataIt.more()) { | |
bReturn = true; | |
BSONElement keyDataElt = keyDataIt.next(); | |
string strTerm = keyDataElt.String(); | |
if (setTerms.find(strTerm) == setTerms.end()) | |
{ | |
setTerms.insert(strTerm); | |
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { | |
_txn->checkForInterrupt(); | |
WriteUnitOfWork wunit(_txn); | |
Status status = collTerm->insertDocument(_txn, BSON("_id" << nIndex << "term" << strTerm), true); | |
//log() << "textTermBuild2:" << status.toString(); | |
nCount++; | |
nIndex++; | |
if (!status.isOK()) { | |
error() << "error: exception textTermBuild2 InsertTerms object in " << "PUT COLLECTION NAME HERE" << ' ' | |
<< status << " term:" << strTerm; | |
} | |
uassertStatusOK(status); | |
wunit.commit(); | |
} | |
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "textTermBuild2 InsertTerms", "PUT COLLECTION NAME HERE"); | |
} | |
} | |
kv = cursor->next(SortedDataInterface::Cursor::kWantKey); | |
} | |
return bReturn; | |
} | |
virtual bool run(OperationContext* txn, const string& dbname, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result) { | |
_txn = txn; | |
string targetColl = cmdObj.firstElement().String(); | |
string ns = dbname + "." + cmdObj.firstElement().String(); // test.Foo | |
string nsTerm = ns + "_Terms"; // test.Foo_Terms | |
string cTermNew = cmdObj.firstElement().String() + "_Terms.new"; //Foo_Terms.new | |
string nsTermNew = dbname + "." + cTermNew; // test.Foo_Terms.new | |
log() << "textTermBuild2 neTermNew=" << nsTermNew << std::endl; | |
NamespaceString nsTargetColl(dbname, targetColl); | |
NamespaceString nsTermsColl(dbname,targetColl+"_Terms"); | |
NamespaceString nsTermsCollNew(dbname,targetColl+"_Terms.new"); | |
Status status = userAllowedWriteNS(nsTermsColl); | |
if (!status.isOK()) { | |
return appendCommandStatus(result, status); | |
} | |
// 1. drop <coll>_Terms.new | |
// 2. create <coll>_Terms.new | |
// 3. createIndex <coll>_Terms.new { "term" : 1 } | |
// now we know we have to create index(es) | |
// Note: createIndexes command does not currently respect shard versioning. | |
ScopedTransaction transaction(_txn, MODE_IX); | |
Lock::DBLock dbLock(_txn->lockState(), nsTermsCollNew.db(), MODE_X); | |
if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nsTermsCollNew)) { | |
return appendCommandStatus( | |
result, | |
Status(ErrorCodes::NotMaster, | |
str::stream() << "Not primary while creating indexes in " << nsTermsCollNew.ns())); | |
} | |
Database* db = dbHolder().get(_txn, nsTermsCollNew.db()); | |
if (!db) { | |
db = dbHolder().openDb(txn, nsTermsCollNew.db()); | |
} | |
Collection* collTermsCollNew = db->getCollection(nsTermsCollNew.ns()); | |
// the <coll>_Terms.new should never be there! it's always renamed at | |
// the end of this command | |
if (!collTermsCollNew) { | |
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { | |
WriteUnitOfWork wunit(_txn); | |
collTermsCollNew = db->createCollection(txn, nsTermsCollNew.ns(), CollectionOptions()); | |
invariant(collTermsCollNew); | |
collTermsCollNew->getIndexCatalog()->createIndexOnEmptyCollection(_txn,BSON("terms"<<1)); | |
wunit.commit(); | |
} | |
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "create:"+nsTermNew, nsTermsCollNew.ns()); | |
} | |
auto client = _txn->getClient(); | |
ScopeGuard lastOpSetterGuard = | |
MakeObjGuard(repl::ReplClientInfo::forClient(client), | |
&repl::ReplClientInfo::setLastOpToSystemLastOpTime, | |
_txn); | |
// 4. find text index in <coll> | |
// 5. get FTSAccessMethod | |
vector<IndexDescriptor*> idxMatches; | |
Collection* collTarget = db->getCollection(nsTargetColl.ns()); | |
collTarget->getIndexCatalog()->findIndexByType(_txn, IndexNames::TEXT, idxMatches, false); | |
if (idxMatches.empty()) { | |
errmsg = "text index required for textTerms command"; | |
return false; | |
} | |
if (idxMatches.size() > 1) { | |
errmsg = "more than one text index found for textTerms command"; | |
return false; | |
} | |
invariant(idxMatches.size() == 1); | |
IndexDescriptor* index = idxMatches[0]; | |
const FTSAccessMethod* fam = | |
static_cast<FTSAccessMethod*>(collTarget->getIndexCatalog()->getIndex(index)); | |
invariant(fam); | |
_txn->recoveryUnit()->abandonSnapshot(); | |
dbLock.relockWithMode(MODE_IX); | |
if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nsTargetColl)) { | |
return appendCommandStatus( | |
result, | |
Status(ErrorCodes::NotMaster, | |
str::stream() << "Not primary while creating background indexes in " | |
<< nsTargetColl.ns())); | |
} | |
// 6. call InsertTerms into <coll>_Terms.new | |
InsertTerms(dbname, collTermsCollNew, nsTermsCollNew,fam, true); | |
Lock::DBLock dbLock2(_txn->lockState(), nsTermsColl.db(), MODE_X); | |
// 7. Drop <coll>_Terms | |
// 8. Rename <coll>_Terms -> <coll>_Terms.new | |
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { | |
WriteUnitOfWork wunit(_txn); | |
log() << "textTermBuild2 rename start from:" << nsTermNew << " to:" << nsTerm << std::endl; | |
log() << "textTermBuild2 rename try drop:" << nsTerm << std::endl; | |
Status ds = db->dropCollection(_txn, nsTerm); | |
if (ds.isOK()) { | |
log() << "textTermBuild2 rename dropped " << nsTerm << std::endl; | |
} else { | |
result.append("error",ds.reason()); | |
errmsg = ds.reason(); | |
error() << "textTermBuild2 drop error:" << ds.reason() << std::endl; | |
return false; | |
} | |
Status status = db->renameCollection(_txn, nsTermNew, nsTerm, true /* ?stayTemp? this will drop if exists?*/); | |
log() << "textTermBuild2 rename done" << std::endl; | |
if (!status.isOK()) { | |
result.append("error",status.reason()); | |
errmsg = status.reason(); | |
error() << "textTermBuild2 rename error:" << status.reason() << std::endl; | |
return false; | |
} | |
wunit.commit(); | |
} | |
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "textTermBuild", nsTermsColl.ns()); | |
lastOpSetterGuard.Dismiss(); | |
result.append("ok", 1); | |
return true; | |
} | |
}CmdTextTermBuild2; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment