while (it0 != it1 && result. count() <= resultSize)
{
result. append(*it0);
it0Old = it0;
it0++;
pDsForDownloading. modify(it0Old, PD::BlockedChange(true));
}
_rWLockPD. unlock();
}
void BMICDM::getFreePDsForParsing(PDContainer &result, const int &count)
{
result. clear();
result. reserve(count);
_rWLockPD. lockForWrite();
auto &pDsForParsing = _storePD. get<PD::ByBlockedAndDownloadedAndParsedAndErrorCode>();
PDStore::index<PD::ByBlockedAndDownloadedAndParsedAndErrorCode>::type::iterator it0, it1, it0Old;
std::tie(it0,it1) = pDsForParsing. equal_range(std::make_tuple(false, true, false, 0));
while(it0 != it1 && result. count() < count)
{
result. append(*it0);
it0Old = it0;
it0++;
pDsForParsing. modify(it0Old, PD::BlockedChange(true));
}
_rWLockPD. unlock();
}
void BMICDM::insertPDs(const PDContainer &pDs)
{
QVector<std::pair<ulong, ulong>> pDIdsToReplace;
QVector<ulong> pDIdsToRemove;
pDIdsToReplace. reserve(pDs. size() / 2);
_rWLockPD. lockForWrite();
for (const PageData &pD: pDs)
{
auto insertion = _storePD. insert(pD);
bool inserted = insertion. second;
if (!inserted)
{
auto &sameUrlPDs = _storePD. get<PD::ByPhDAndNormalizedUrl>();
PDStore::index<PD::ByPhDAndNormalizedUrl>::type::iterator it0, it1;
it0 = sameUrlPDs. find(
std::make_tuple(pD. phD, pD. normalizedUrl));
it1 = sameUrlPDs. end();
if (it0 != it1)
{
PageData replacePD = *it0;
while (replacePD. replaceId && replacePD. id)
{
replacePD = _findById(replacePD. replaceId);
}
pDIdsToReplace. append(std::make_pair(pD. id, replacePD. id));
//правим уровень, если у ранее найденной PD был уровень выше
if (replacePD. level > pD. level)
{
PDStore::index<PD::ById>::type &pDsById = _storePD. get<PD::ById>();
PDStore::index<PD::ById>::type::iterator itById0, itById1;
itById0 = pDsById. find(replacePD. id);
itById1 = pDsById. end();
if (itById0 != itById1)
{
pDsById. modify(itById0, PD::LevelChange(pD. level));
}
}
}
}
else
{
//для заданной глубины
bool depthIsExhausted =pD. phD->maxCrawlLevel >= 0
&& pD. level > pD. phD->maxCrawlLevel;
if (depthIsExhausted)
{
_storePD. erase(insertion. first);
pDIdsToRemove. append(pD. id);
}
}
}
_rWLockPD. unlock();
if (pDIdsToReplace. size())
_changeOrRemovePAs(pDIdsToReplace);
if (pDIdsToRemove. size())
_removePAsByToAndFrom(pDIdsToRemove);
}
void BMICDM::addPDsAndPAs(const PDPAContainer &pDPAs)
{
PAContainer pAs;
pAs. reserve(pDPAs. size());
_rWLockPD. lockForWrite();
auto &sameUrlPDs = _storePD. get<PD::ByPhDAndNormalizedUrl>();
PDStore::index<PD::ByPhDAndNormalizedUrl>::type::iterator it0, it1;
for(const PDPACreateData & pDPA: pDPAs)
{
const PageData &pD = pDPA. pD;
auto phD = pD. phD;
it0 = sameUrlPDs. find(std::make_tuple(phD, pDPA. normalizedUrl));
it1 = sameUrlPDs. end();
if (it0 == it1)
{
PageData newPD(PageData::newIdUnsafe(), pDPA. url, pDPA. normalizedUrl, phD);
newPD. level = pD. level + 1;
newPD. idFrom = pD. id;
_storePD. insert(newPD);
pAs. append(PageArc(pD. id, newPD. id));
}
else
{
PageData replacePD = *it0;
while (replacePD. replaceId && replacePD. id)
{
replacePD = _findById(replacePD. replaceId);
}
//правим уровень, если у ранее найденной PD был уровень выше
uint level = pD. level + 1;
if (replacePD. level > level)
{
PDStore::index<PD::ById>::type &pDsById = _storePD. get<PD::ById>();
PDStore::index<PD::ById>::type::iterator itById0, itById1;
itById0 = pDsById. find(replacePD. id);
itById1 = pDsById. end();
if (itById0 != itById1)
{
pDsById. modify(itById0, PD::LevelChange(level));
}
}
pAs. append(PageArc(pD. id, replacePD. id));
}
}
_rWLockPD. unlock();
_mutexPA. lock();
for (PageArc &pA: pAs)
{
pA. id = PageArc::newIdUnsafe();
_storePA. insert(pA);
}
_mutexPA. unlock();
}
void BMICDM::updatePDs(const PDContainer &pDs)
{
QVector<std::pair<ulong, ulong>> pDIdsToReplace;
QVector<ulong> pDIdsToRemove;
_rWLockPD. lockForWrite();
boost::multi_index::index<PDStore, PD::ById>::type& pDsById = _storePD. get<PD::ById>();
for (PageData pD: pDs)
{
bool remove = false;
if (!pD. remove)
{
pD. blocked = false;
boost::multi_index::index<PDStore, PD::ById>::type::iterator it = pDsById. find(pD. id);
if (!pDsById. replace(it, pD))
{
remove = true;
}
else if (pD. errorCode == RCCConsts::CONTENT_DUPLICATE)
{
pDIdsToReplace. append(std::make_pair(pD. id, pD. replaceId));
}
}
if (pD. remove || remove)
{
boost::multi_index::index<PDStore, PD::ById>::type::iterator it = pDsById. find(pD. id);
pDsById. erase(it);
if (pD. replaceId)
pDIdsToReplace. append(std::make_pair(pD. id, pD. replaceId));
else
pDIdsToRemove. append(pD. id);
}
}
_rWLockPD. unlock();
if (pDIdsToReplace. size())
_changeOrRemovePAs(pDIdsToReplace);
if (pDIdsToRemove. size())
_removePAsByToAndFrom(pDIdsToRemove);
}
std::pair<bool, PageData> BMICDM::contentSeen(HostData* phD, const QString &content)
{
_rWLockPD. lockForRead();
PDStore::index<PD::ByPhDAndContentHash>::type::iterator it0, it1;
uint contentHash = PageData::hashContent(content);
std::tie(it0,it1) = _storePD. get<PD::ByPhDAndContentHash>().equal_range(std::make_tuple(phD, contentHash));
bool sawContent = (it0 != it1);
_rWLockPD. unlock();
PageData pD;
if (sawContent)
pD = *it0;
return std::make_pair(sawContent, pD);
}
void BMICDM::changePAsByTo(const int &to, const int &newTo)
{
_mutexPA. lock();
PAStore::index<PA::ByTo>::type::iterator it0, it1;
std::tie(it0,it1) = _storePA. get<PA::ByTo>()
.equal_range(to);
_mutexPA. unlock();
}
PDLIContainer *BMICDM::getLog(const DataManager::LogType &logType, const int size)
{
PDLIContainer *ppDLIs = new PDLIContainer;
switch (logType)
{
case DataManager::CommonLog:
{
_mutexCL. lock();
while(!_commonLog. isEmpty() && ppDLIs->size() < size)
{
ppDLIs->append(_commonLog. takeFirst());
}
_mutexCL. unlock();
return ppDLIs;
}
case DataManager::ErrorLog:
{
_mutexEL. lock();
while(!_errorLog. isEmpty() && ppDLIs->size() < size)
{
ppDLIs->append(_errorLog. takeFirst());
}
_mutexEL. unlock();
return ppDLIs;
}
}
}
void BMICDM::insertLogItem(const DataManager::LogType &logType, const PDLogItem &pDLI)
{
switch (logType)
{
case DataManager::CommonLog:
{
_mutexCL. lock();
_commonLog. append(pDLI);
_mutexCL. unlock();
break;
}
case DataManager::ErrorLog:
{
_mutexEL. lock();
_errorLog. append(pDLI);
_mutexEL. unlock();
break;
}
}
}
void BMICDM::insertLogItems(const DataManager::LogType &logType, const QVector<PDLogItem> &items)
{
switch (logType)
{
case DataManager::CommonLog:
{
_mutexCL. lock();
for (auto item: items)
{
_commonLog. append(item);
}
_mutexCL. unlock();
break;
}
case DataManager::ErrorLog:
{
_mutexEL. lock();
for (auto item: items)
{
_commonLog. append(item);
}
_mutexEL. unlock();
break;
}
}
}
void BMICDM::_changeOrRemovePAs(const QVector<std::pair<ulong, ulong> > &pDIdsToReplace)
{
_mutexPA. lock();
typedef PAStore::index<PA::ByTo>::type::iterator ByToIt;
PAStore::index<PA::ByTo>::type & to = _storePA. get<PA::ByTo>();
for (auto& idPair: pDIdsToReplace)
{
ByToIt it0, it1;
std::tie(it0,it1) = to. equal_range(idPair. first);
QVector<ByToIt> modifyIts;
while (it0 != it1)
{
modifyIts. append(it0);
it0++;
}
for (auto &it: modifyIts)
{
to. modify(it, PA::ToChange(idPair. second));
}
std::tie(it0,it1) = to. equal_range(idPair. first);
to. erase(it0, it1);
|
Из за большого объема этот материал размещен на нескольких страницах:
1 2 3 4 5 6 7 8 9 |


