TrackerWatcherCommand.cc 12.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
/* <!-- copyright */
/*
 * aria2 - The high speed download utility
 *
 * Copyright (C) 2006 Tatsuhiro Tsujikawa
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 *
 * In addition, as a special exception, the copyright holders give
 * permission to link the code of portions of this program with the
 * OpenSSL library under certain conditions as described in each
 * individual source file, and distribute linked combinations
 * including the two.
 * You must obey the GNU General Public License in all respects
 * for all of the code used other than OpenSSL.  If you modify
 * file(s) with this exception, you may extend this exception to your
 * version of the file(s), but you are not obligated to do so.  If you
 * do not wish to do so, delete this exception statement from your
 * version.  If you delete this exception statement from all source
 * files in the program, then also delete it here.
 */
/* copyright --> */
#include "TrackerWatcherCommand.h"

#include <sstream>

#include "DownloadEngine.h"
#include "BtAnnounce.h"
#include "BtRuntime.h"
#include "PieceStorage.h"
#include "PeerStorage.h"
#include "Peer.h"
#include "prefs.h"
#include "message.h"
#include "ByteArrayDiskWriterFactory.h"
#include "RecoverableException.h"
#include "PeerInitiateConnectionCommand.h"
#include "DiskAdaptor.h"
#include "FileEntry.h"
#include "RequestGroup.h"
#include "Option.h"
#include "DlAbortEx.h"
#include "Logger.h"
#include "LogFactory.h"
#include "A2STR.h"
#include "SocketCore.h"
#include "Request.h"
#include "AnnounceTier.h"
#include "DownloadContext.h"
#include "bittorrent_helper.h"
#include "a2functional.h"
#include "util.h"
#include "fmt.h"
66 67 68 69
#include "UDPTrackerRequest.h"
#include "UDPTrackerClient.h"
#include "BtRegistry.h"
#include "NameResolveCommand.h"
70 71 72

namespace aria2 {

73
HTTPAnnRequest::HTTPAnnRequest(std::unique_ptr<RequestGroup> rg)
74
    : rg_{std::move(rg)}
75 76 77
{
}

78
HTTPAnnRequest::~HTTPAnnRequest() = default;
79 80 81 82

bool HTTPAnnRequest::stopped() const { return rg_->getNumCommand() == 0; }

bool HTTPAnnRequest::success() const { return rg_->downloadFinished(); }
83 84 85 86 87 88 89 90 91

void HTTPAnnRequest::stop(DownloadEngine* e)
{
  rg_->setForceHaltRequested(true);
}

bool HTTPAnnRequest::issue(DownloadEngine* e)
{
  try {
92 93 94
    std::vector<std::unique_ptr<Command>> commands;
    rg_->createInitialCommand(commands, e);
    e->addCommand(std::move(commands));
95 96 97
    e->setNoWait(true);
    A2_LOG_DEBUG("added tracker request command");
    return true;
98 99
  }
  catch (RecoverableException& ex) {
100 101 102 103 104
    A2_LOG_ERROR_EX(EX_EXCEPTION_CAUGHT, ex);
    return false;
  }
}

105 106
bool HTTPAnnRequest::processResponse(
    const std::shared_ptr<BtAnnounce>& btAnnounce)
107 108 109 110 111
{
  try {
    std::stringstream strm;
    unsigned char data[2048];
    rg_->getPieceStorage()->getDiskAdaptor()->openFile();
112 113 114 115
    while (1) {
      ssize_t dataLength = rg_->getPieceStorage()->getDiskAdaptor()->readData(
          data, sizeof(data), strm.tellp());
      if (dataLength == 0) {
116 117 118 119 120
        break;
      }
      strm.write(reinterpret_cast<const char*>(data), dataLength);
    }
    std::string res = strm.str();
121 122
    btAnnounce->processAnnounceResponse(
        reinterpret_cast<const unsigned char*>(res.c_str()), res.size());
123
    return true;
124 125 126 127 128 129 130 131 132
  }
  catch (RecoverableException& e) {
    const auto& dctx = rg_->getDownloadContext();
    const auto& fe = dctx->getFirstFileEntry();
    auto uris = fe->getUris();

    A2_LOG_ERROR_EX(fmt("GID#%s - Tracker request %s failed",
                        GroupId::toHex(rg_->getGID()).c_str(), uris[0].c_str()),
                    e);
133 134 135 136
    return false;
  }
}

137
UDPAnnRequest::UDPAnnRequest(const std::shared_ptr<UDPTrackerRequest>& req)
138 139 140
    : req_(req)
{
}
141

142
UDPAnnRequest::~UDPAnnRequest() = default;
143 144 145 146 147 148 149 150 151

bool UDPAnnRequest::stopped() const
{
  return !req_ || req_->state == UDPT_STA_COMPLETE;
}

bool UDPAnnRequest::success() const
{
  return req_ && req_->state == UDPT_STA_COMPLETE &&
152
         req_->error == UDPT_ERR_SUCCESS;
153 154 155 156
}

void UDPAnnRequest::stop(DownloadEngine* e)
{
157
  if (req_) {
158
    req_->user_data = nullptr;
159 160 161 162 163 164
    req_.reset();
  }
}

bool UDPAnnRequest::issue(DownloadEngine* e)
{
165
  if (req_) {
166
    e->addCommand(make_unique<NameResolveCommand>(e->newCUID(), e, req_));
167 168
    e->setNoWait(true);
    return true;
169 170
  }
  else {
171 172 173 174
    return false;
  }
}

175 176
bool UDPAnnRequest::processResponse(
    const std::shared_ptr<BtAnnounce>& btAnnounce)
177
{
178
  if (req_) {
179 180
    btAnnounce->processUDPTrackerResponse(req_);
    return true;
181 182
  }
  else {
183 184 185 186
    return false;
  }
}

187 188 189 190 191 192 193
TrackerWatcherCommand::TrackerWatcherCommand(cuid_t cuid,
                                             RequestGroup* requestGroup,
                                             DownloadEngine* e)
    : Command(cuid),
      requestGroup_(requestGroup),
      e_(e),
      udpTrackerClient_(e_->getBtRegistry()->getUDPTrackerClient())
194 195
{
  requestGroup_->increaseNumCommand();
196
  if (udpTrackerClient_) {
197 198
    udpTrackerClient_->increaseWatchers();
  }
199 200 201 202 203
}

TrackerWatcherCommand::~TrackerWatcherCommand()
{
  requestGroup_->decreaseNumCommand();
204
  if (udpTrackerClient_) {
205 206
    udpTrackerClient_->decreaseWatchers();
  }
207 208
}

209 210 211 212
bool TrackerWatcherCommand::execute()
{
  if (requestGroup_->isForceHaltRequested()) {
    if (!trackerRequest_) {
213
      return true;
214 215
    }
    else if (trackerRequest_->stopped() || trackerRequest_->success()) {
216
      return true;
217 218
    }
    else {
219
      trackerRequest_->stop(e_);
220
      e_->setRefreshInterval(std::chrono::milliseconds(0));
221
      e_->addCommand(std::unique_ptr<Command>(this));
222 223 224
      return false;
    }
  }
225
  if (btAnnounce_->noMoreAnnounce()) {
226 227 228
    A2_LOG_DEBUG("no more announce");
    return true;
  }
229
  if (!trackerRequest_) {
230
    trackerRequest_ = createAnnounce(e_);
231
    if (trackerRequest_) {
232
      trackerRequest_->issue(e_);
233
      A2_LOG_DEBUG("tracker request created");
234
    }
235 236
  }
  else if (trackerRequest_->stopped()) {
237 238 239 240
    // We really want to make sure that tracker request has finished
    // by checking getNumCommand() == 0. Because we reset
    // trackerRequestGroup_, if it is still used in other Command, we
    // will get Segmentation fault.
241 242
    if (trackerRequest_->success()) {
      if (trackerRequest_->processResponse(btAnnounce_)) {
243 244
        btAnnounce_->announceSuccess();
        btAnnounce_->resetAnnounce();
245
        addConnection();
246 247
      }
      else {
248
        btAnnounce_->announceFailure();
249
        if (btAnnounce_->isAllAnnounceFailed()) {
250 251 252
          btAnnounce_->resetAnnounce();
        }
      }
253
      trackerRequest_.reset();
254 255
    }
    else {
256 257
      // handle errors here
      btAnnounce_->announceFailure(); // inside it, trackers = 0.
258
      trackerRequest_.reset();
259
      if (btAnnounce_->isAllAnnounceFailed()) {
260 261 262 263
        btAnnounce_->resetAnnounce();
      }
    }
  }
264 265 266 267 268 269

  if (!trackerRequest_ && btAnnounce_->noMoreAnnounce()) {
    A2_LOG_DEBUG("no more announce");
    return true;
  }

270
  e_->addCommand(std::unique_ptr<Command>(this));
271 272 273
  return false;
}

274
void TrackerWatcherCommand::addConnection()
275
{
276 277
  while (!btRuntime_->isHalt() && btRuntime_->lessThanMinPeers()) {
    if (!peerStorage_->isPeerAvailable()) {
278 279
      break;
    }
280
    cuid_t ncuid = e_->newCUID();
281
    std::shared_ptr<Peer> peer = peerStorage_->checkoutPeer(ncuid);
282
    // sanity check
283
    if (!peer) {
284 285
      break;
    }
286 287
    auto command = make_unique<PeerInitiateConnectionCommand>(
        ncuid, requestGroup_, peer, e_, btRuntime_);
288 289
    command->setPeerStorage(peerStorage_);
    command->setPieceStorage(pieceStorage_);
290
    e_->addCommand(std::move(command));
291
    A2_LOG_DEBUG(fmt("CUID#%" PRId64 " - Adding new command CUID#%" PRId64 "",
292 293 294 295
                     getCuid(), peer->usedBy()));
  }
}

296
std::unique_ptr<AnnRequest>
297 298
TrackerWatcherCommand::createAnnounce(DownloadEngine* e)
{
299 300
  while (!btAnnounce_->isAllAnnounceFailed() &&
         btAnnounce_->isAnnounceReady()) {
301 302 303
    std::string uri = btAnnounce_->getAnnounceUrl();
    uri_split_result res;
    memset(&res, 0, sizeof(res));
304
    if (uri_split(&res, uri.c_str()) == 0) {
305 306
      // Without UDP tracker support, send it to normal tracker flow
      // and make it fail.
307
      std::unique_ptr<AnnRequest> treq;
308 309
      if (udpTrackerClient_ &&
          uri::getFieldString(res, USR_SCHEME, uri.c_str()) == "udp") {
310
        uint16_t localPort;
311 312 313 314 315 316
        localPort = e->getBtRegistry()->getTcpPort();
        treq =
            createUDPAnnRequest(uri::getFieldString(res, USR_HOST, uri.c_str()),
                                res.port, localPort);
      }
      else {
317 318 319
        treq = createHTTPAnnRequest(btAnnounce_->getAnnounceUrl());
      }
      btAnnounce_->announceStart(); // inside it, trackers++.
320
      return treq;
321 322
    }
    else {
323 324
      btAnnounce_->announceFailure();
    }
325
  }
326
  if (btAnnounce_->isAllAnnounceFailed()) {
327 328
    btAnnounce_->resetAnnounce();
  }
329
  return nullptr;
330 331
}

332
std::unique_ptr<AnnRequest>
333
TrackerWatcherCommand::createUDPAnnRequest(const std::string& host,
334
                                           uint16_t port, uint16_t localPort)
335
{
336 337 338 339
  auto req = btAnnounce_->createUDPTrackerRequest(host, port, localPort);
  req->user_data = this;

  return make_unique<UDPAnnRequest>(std::move(req));
340 341 342
}

namespace {
343
bool backupTrackerIsAvailable(const std::shared_ptr<DownloadContext>& context)
344
{
345
  auto torrentAttrs = bittorrent::getTorrentAttrs(context);
346
  if (torrentAttrs->announceList.size() >= 2) {
347 348
    return true;
  }
349
  if (torrentAttrs->announceList.empty()) {
350 351
    return false;
  }
352
  if (torrentAttrs->announceList[0].size() >= 2) {
353
    return true;
354 355
  }
  else {
356 357 358 359 360
    return false;
  }
}
} // namespace

361
std::unique_ptr<AnnRequest>
362
TrackerWatcherCommand::createHTTPAnnRequest(const std::string& uri)
363 364 365
{
  std::vector<std::string> uris;
  uris.push_back(uri);
366 367
  auto option = util::copy(getOption());
  auto rg = make_unique<RequestGroup>(GroupId::create(), option);
368
  if (backupTrackerIsAvailable(requestGroup_->getDownloadContext())) {
369
    A2_LOG_DEBUG("This is multi-tracker announce.");
370 371
  }
  else {
372 373 374 375 376 377 378 379 380 381
    A2_LOG_DEBUG("This is single-tracker announce.");
  }
  rg->setNumConcurrentCommand(1);
  // If backup tracker is available, try 2 times for each tracker
  // and if they all fails, then try next one.
  option->put(PREF_MAX_TRIES, "2");
  // TODO When dry-run mode becomes available in BitTorrent, set
  // PREF_DRY_RUN=false too.
  option->put(PREF_USE_HEAD, A2_V_FALSE);
  // Setting tracker timeouts
382 383
  rg->setTimeout(
      std::chrono::seconds(option->getAsInt(PREF_BT_TRACKER_TIMEOUT)));
384 385 386 387
  option->put(PREF_CONNECT_TIMEOUT,
              option->get(PREF_BT_TRACKER_CONNECT_TIMEOUT));
  option->put(PREF_REUSE_URI, A2_V_FALSE);
  option->put(PREF_SELECT_LEAST_USED_HOST, A2_V_FALSE);
388 389
  auto dctx = std::make_shared<DownloadContext>(
      option->getAsInt(PREF_PIECE_LENGTH), 0, "[tracker.announce]");
390 391
  dctx->getFileEntries().front()->setUris(uris);
  rg->setDownloadContext(dctx);
392
  auto dwf = std::make_shared<ByteArrayDiskWriterFactory>();
393 394 395
  rg->setDiskWriterFactory(dwf);
  rg->setFileAllocationEnabled(false);
  rg->setPreLocalFileCheckEnabled(false);
396 397 398 399
  // Clearing pre- and post handler is not needed because the
  // RequestGroup is not handled by RequestGroupMan.
  rg->clearPreDownloadHandler();
  rg->clearPostDownloadHandler();
400
  dctx->setAcceptMetalink(false);
401 402
  A2_LOG_INFO(fmt("Creating tracker request group GID#%s",
                  GroupId::toHex(rg->getGID()).c_str()));
403
  return make_unique<HTTPAnnRequest>(std::move(rg));
404 405
}

406 407
void TrackerWatcherCommand::setBtRuntime(
    const std::shared_ptr<BtRuntime>& btRuntime)
408 409 410 411
{
  btRuntime_ = btRuntime;
}

412 413
void TrackerWatcherCommand::setPeerStorage(
    const std::shared_ptr<PeerStorage>& peerStorage)
414 415 416 417
{
  peerStorage_ = peerStorage;
}

418 419
void TrackerWatcherCommand::setPieceStorage(
    const std::shared_ptr<PieceStorage>& pieceStorage)
420 421 422 423
{
  pieceStorage_ = pieceStorage;
}

424 425
void TrackerWatcherCommand::setBtAnnounce(
    const std::shared_ptr<BtAnnounce>& btAnnounce)
426 427 428 429
{
  btAnnounce_ = btAnnounce;
}

430
const std::shared_ptr<Option>& TrackerWatcherCommand::getOption() const
431 432 433 434 435
{
  return requestGroup_->getOption();
}

} // namespace aria2