{
finish = true;
break;
}
}
if (finish)
{
_stopThreads();
int crawlingTime = runTimer. elapsed() / 1000;
QStringList rUNames = psett->value( "ResultUnloaders", "r").toString()
.split(",");
for (auto rUName: rUNames)
{
ResultUnloader *prU = ObjectCreator::resultUnloader(rUName. trimmed(), _pdM);
prU->unloadResult();
delete prU;
}
int unloadTime = runTimer. elapsed() / 1000 - crawlingTime;
qDebug() << "Work is done." << endl
<< "Crawling time: " << crawlingTime << "s." << endl
<< "Unload time: " << unloadTime << "s." << endl
<< "Press Enter to exit";
std::cin. get();
break;
}
}
return 0;
}
DataManager* TextFileAM::_setupDataManager()
{
RCCSettings *psett = RCCSettings::instance();
QString name = psett->value( "DataManager", "r").toString();
DataManager *pdM = ObjectCreator::dataManager(name);
_app->pdM = pdM;
QFile hostsF("hosts. txt");
if (!hostsF. exists())
{
qDebug() << "Place hosts. txt in application folder.";
((RCCApplication*)qApp)->exit();
throw 0;
}
hostsF. open(QIODevice::ReadOnly);
QStringList lineParts;
PDContainer pDs;
QString robotsTxtClass = psett->value( "RobotsTxtClass", "r").toString();
RobotsTxt *prTxt = HelperCreator::robotsTxt(robotsTxtClass);
while (!hostsF. atEnd())
{
QByteArray line = hostsF. readLine();
if (line. startsWith('#'))
continue;
lineParts = QString(line).split(';');
HostData *phD = new HostData(lineParts. at(0),lineParts. at(1).toInt(),
lineParts. at(3).toInt(), lineParts. at(2).toInt());
phD->rules = prTxt->readRules(lineParts. at(0));
_app->pdM->addHost(phD);
pDs. append(PageData(PageData::newIdSafe(), "/", "/", phD));
}
delete prTxt;
pdM->insertPDs(pDs);
hostsF. close();
return _app->pdM;
}
void TextFileAM::_setupThreads()
{
RCCSettings *psett = RCCSettings::instance();
QString name = psett->value( "DownloadingThread", "r").toString();
DownloadingThread *pdT = ObjectCreator::downloadingThread(name, _pdM);
_app->pdownloadingThread = pdT;
pdT->moveToThread(pdT);
pdT->pnam->moveToThread(pdT);
name = psett->value( "ParsingThread", "r").toString();
int parsingThreadsCount = psett->value( "ParsingThreadsCount", "r").toInt();
for (int i = 0; i < parsingThreadsCount; i++)
{
ParsingThread *ppT = ObjectCreator::parsingThread(name, _pdM);
_app->parsingThreadsList. append(ppT);
ppT->moveToThread(ppT);
}
name = psett->value( "RoutineThread", "r").toString();
RoutineThread *prT = ObjectCreator::storingThread(name, _pdM);
_app->proutineThread = prT;
prT->moveToThread(prT);
}
void TextFileAM::_setupApplicationFinishers()
{
RCCSettings *psett = RCCSettings::instance();
QStringList names = psett->value( "ApplicationFinishers", "r")
.toString().split(",");
for (QString &name: names)
{
name = name. trimmed();
_appFs. append(HelperCreator::applicationFinisher(name));
}
}
void TextFileAM::_stopThreads()
{
_app->pdownloadingThread->stop = true;
for (ParsingThread *pthread: _app->parsingThreadsList)
{
pthread->stop = true;
}
_app->proutineThread->stop = true;
}
data_managers/bmicdm. h
#ifndef BMICDATAMANAGER_H
#define BMICDATAMANAGER_H
#include "datamanager. h"
#include "data_structures/pdstore. h"
#include "data_structures/pastore. h"
//boost multi index containers
class BMICDM: public DataManager
{
public:
BMICDM();
virtual bool addHost(HostData* phD);
virtual bool allWorkIsDone();
virtual QVector<HostData*> getHosts();
virtual void getPDsStartingFromId(PDContainer &result, const ulong &id,
const int& countMultiplier = RCCConsts::PAGEDATA_CHUNK_SIZE_COUNT_MULTIPLIER);
virtual void getPAsStartingFromId(PAContainer &result, const ulong &id,
const int& count = RCCConsts::PAGEDATA_DEFAULT_CHUNK_SIZE);
virtual void getFreePDsForDownloading(PDContainer &result, HostData *phD,
const int& countMultiplier = RCCConsts::PAGEDATA_DEFAULT_CHUNK_SIZE);
virtual void getFreePDsForParsing(PDContainer &result, const int& count = RCCConsts::PAGEDATA_DEFAULT_CHUNK_SIZE);
virtual void insertPDs(const PDContainer &pDs);
virtual void addPDsAndPAs(const PDPAContainer &pDPAs);
virtual void updatePDs(const PDContainer &pDs);
virtual std::pair<bool, PageData> contentSeen(HostData* phD, const QString& content);
virtual void insertPAs(const PAContainer& pAs);
virtual void changePAsByTo(const int &to, const int &newTo);
virtual PDLIContainer* getLog(const DataManager::LogType &logType, const int size = INT_MAX);
virtual void insertLogItem(const DataManager::LogType &logType, const PDLogItem& pDLI);
virtual void insertLogItems(const DataManager::LogType &logType, const QVector<PDLogItem> &items);
protected:
void _changeOrRemovePAs(const QVector<std::pair<ulong, ulong>> &pDIdsToReplace);
void _removePAsByToAndFrom(const QVector<ulong> &pDIdsToRemove);
PageData _findById(ulong id);
QVector<HostData*> _hosts;
PDStore _storePD;
PAStore _storePA;
PDLIContainer _commonLog;
PDLIContainer _errorLog;
QReadWriteLock _rWLockPD;
QMutex _mutexPA;
QMutex _mutexCL;
QMutex _mutexEL;
};
#endif // BMICDATAMANAGER_H
data_managers/bmicdm. cpp
#include "bmicdm. h"
BMICDM::BMICDM()
{
}
bool BMICDM::addHost(HostData* phD)
{
_hosts. append(phD);
return true;
}
bool BMICDM::allWorkIsDone()
{
PDStore::index<PD::ByBlocked>::type::iterator it0, it1;
auto &blockedPDs = _storePD. get<PD::ByBlocked>();
it0 = blockedPDs. find(true);
it1 = blockedPDs. end();
if (it0 != it1)
{
PageData testPD = *it0;
QVector<PDLogItem> log;
log. reserve(5);
log. append(PDLogItem("------------------------------", PageData()));
log. append(PDLogItem("", testPD));
log. append(PDLogItem(QString::number(_storePD. size()) + " - PD count", PageData()));
log. append(PDLogItem(QString::number(_storePA. size()) + " - PA count", PageData()));
log. append(PDLogItem("------------------------------", PageData()));
insertLogItems(DataManager::CommonLog, log);
return false;
}
_rWLockPD. lockForRead();
//есть ли работа для скачивания
for (HostData* phD: _hosts)
{
PDStore::index<PD::ByPhDAndBlockedAndDownloaded>::type::iterator it2, it3;
std::tie(it2,it3) = _storePD. get<PD::ByPhDAndBlockedAndDownloaded>()
.equal_range(std::make_tuple(phD, false, false));
if (it2 != it3)
{
_rWLockPD. unlock();
return false;
}
}
//есть ли работа для разбора
PDStore::index<PD::ByBlockedAndDownloadedAndParsedAndErrorCode>::type::iterator it4, it5;
std::tie(it4,it5) = _storePD. get<PD::ByBlockedAndDownloadedAndParsedAndErrorCode>()
.equal_range(std::make_tuple(false, true, false, 0));
if (it4 != it5)
{
_rWLockPD. unlock();
return false;
}
_rWLockPD. unlock();
return true;
}
QVector<HostData*> BMICDM::getHosts()
{
return _hosts;
}
void BMICDM::getPDsStartingFromId(PDContainer &result, const ulong &id, const int &count)
{
result. clear();
result. reserve(count);
_rWLockPD. lockForRead();
PDStore::index<PD::ById>::type::iterator it0, it1;
it0 = _storePD. get<PD::ById>().lower_bound(id);
it1 = _storePD. get<PD::ById>().end();
while(it0 != it1 && result. count() < count)
{
result. append(*it0);
it0++;
}
_rWLockPD. unlock();
}
void BMICDM::getPAsStartingFromId(PAContainer &result, const ulong &id, const int &count)
{
result. clear();
result. reserve(count);
_mutexPA. lock();
PAStore::index<PA::ById>::type::iterator it0, it1;
it0 = _storePA. get<PA::ById>().lower_bound(id);
it1 = _storePA. get<PA::ById>().end();
while(it0 != it1 && result. count() < count)
{
result. append(*it0);
it0++;
}
_mutexPA. unlock();
}
void BMICDM::getFreePDsForDownloading(PDContainer &result, HostData *phD, const int &countMultiplier)
{
result. clear();
int resultSize = phD->maxDownloadsAtTime * countMultiplier;
result. reserve(resultSize);
_rWLockPD. lockForWrite();
auto &pDsForDownloading = _storePD. get<PD::ByPhDAndBlockedAndDownloaded>();
PDStore::index<PD::ByPhDAndBlockedAndDownloaded>::type::iterator it0, it1, it0Old;
std::tie(it0,it1) = pDsForDownloading. equal_range(std::make_tuple(phD, false, false));
|
Из за большого объема этот материал размещен на нескольких страницах:
1 2 3 4 5 6 7 8 9 |


