Created
February 23, 2022 21:53
-
-
Save jamesr66a/8fdc91eed32c1cac8b18918cfcc16c61 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/torch/csrc/distributed/rpc/rref_context.cpp b/torch/csrc/distributed/rpc/rref_context.cpp | |
index 004e9422be..28b2bbc5c2 100644 | |
--- a/torch/csrc/distributed/rpc/rref_context.cpp | |
+++ b/torch/csrc/distributed/rpc/rref_context.cpp | |
@@ -4,6 +4,9 @@ | |
#include <sstream> | |
+#include <iostream> | |
+#include <unistd.h> | |
+ | |
namespace torch { | |
namespace distributed { | |
namespace rpc { | |
@@ -101,6 +104,11 @@ std::vector<c10::intrusive_ptr<RRef>> RRefContext::destroyInstance( | |
deletedRRefs.emplace_back(std::move(rref)); | |
} | |
} | |
+ { | |
+ std::stringstream ss; | |
+ ss << "(" << getpid() << ") " << __FILE__ << ":" << __LINE__ << "\n"; | |
+ std::cerr << ss.str() << std::endl; | |
+ } | |
ctx.owners_.clear(); | |
ctx.pendingOwners_.clear(); | |
return deletedRRefs; | |
@@ -281,6 +289,11 @@ void RRefContext::delAllUsersAndUnforkedOwners( | |
TORCH_CHECK( | |
iter != owners_.end(), | |
c10::str("Did not find OwnerRRef with RRefId: ", rrefId)); | |
+ { | |
+ std::stringstream ss; | |
+ ss << "(" << getpid() << ") " << __FILE__ << ":" << __LINE__ << " " << rrefId << "\n"; | |
+ std::cerr << ss.str() << std::endl; | |
+ } | |
owners_.erase(iter); | |
} | |
} | |
@@ -315,6 +328,11 @@ c10::intrusive_ptr<OwnerRRef> RRefContext::getOrCreateOwnerRRef( | |
std::lock_guard<std::mutex> lock(mutex_); | |
const auto iter = owners_.find(rrefId); | |
if (iter == owners_.end()) { | |
+ if (rrefId.localId_ == 30) { | |
+ std::stringstream ss; | |
+ ss << "(" << getpid() << ") ^^^^ Scenario 1 (created_on=" << rrefId.createdOn_ << ", local_id=" << rrefId.localId_ << ")"; | |
+ std::cerr << ss.str() << std::endl; | |
+ } | |
// Scenario (1) the first time this owner knows about this RRef | |
// | |
// NB: cannot use make_shared here as the constructor of OwnerRRef is | |
@@ -331,6 +349,11 @@ c10::intrusive_ptr<OwnerRRef> RRefContext::getOrCreateOwnerRRef( | |
} | |
return rref; | |
} else { | |
+ if (rrefId.localId_ == 30) { | |
+ std::stringstream ss; | |
+ ss << "(" << getpid() << ") ^^^^ Scenario 2 (created_on=" << rrefId.createdOn_ << ", local_id=" << rrefId.localId_ << ")"; | |
+ std::cerr << ss.str() << std::endl; | |
+ } | |
// Scenario (2) retrieving an existing RRef | |
auto ownerRRef = fromRRefInterface(iter->second); | |
// Now double check if the two types match | |
@@ -446,6 +469,11 @@ RRefForkData RRefContext::prepareChildFork( | |
// this is needed for OwnerRRefs that were created locally. | |
{ | |
std::lock_guard<std::mutex> lock(mutex_); | |
+ { | |
+ std::stringstream ss; | |
+ ss << "(" << getpid() << ") " << __FILE__ << ":" << __LINE__ << " " << rref->rrefId() << "\n"; | |
+ std::cerr << ss.str() << std::endl; | |
+ } | |
owners_[rref->rrefId()] = rref; | |
} | |
} else { | |
@@ -714,6 +742,11 @@ void RRefContext::finishForkRequest(const ForkId& forkId, worker_id_t parent) { | |
void RRefContext::addSelfAsFork(c10::intrusive_ptr<OwnerRRef>& rref) { | |
std::lock_guard<std::mutex> lock(mutex_); | |
const auto& rrefId = rref->rrefId(); | |
+ { | |
+ std::stringstream ss; | |
+ ss << "(" << getpid() << ") " << __FILE__ << ":" << __LINE__ << " " << rrefId << "\n"; | |
+ std::cerr << ss.str() << std::endl; | |
+ } | |
owners_[rrefId] = rref; | |
auto& rrefForks = forks_[rrefId]; | |
TORCH_INTERNAL_ASSERT( | |
@@ -731,6 +764,11 @@ void RRefContext::addForkOfOwner(const RRefId& rrefId, const ForkId& forkId) { | |
"Got fork notification twice on the same RRef ", | |
forkId); | |
rrefForks.insert(forkId); | |
+ { | |
+ std::stringstream ss; | |
+ ss << "(" << getpid() << ") " << __FILE__ << ":" << __LINE__ << " " << rrefId << " " << forkId << "\n"; | |
+ std::cerr << ss.str() << std::endl; | |
+ } | |
} | |
void RRefContext::addForkOfOwnerIfNotPresent( | |
@@ -744,9 +782,19 @@ void RRefContext::addForkOfOwnerIfNotPresent( | |
// function are idempotent. | |
if (rrefForks.find(forkId) == rrefForks.end()) { | |
rrefForks.insert(forkId); | |
+ { | |
+ std::stringstream ss; | |
+ ss << "(" << getpid() << ") " << __FILE__ << ":" << __LINE__ << " " << rrefId << " " << forkId << "\n"; | |
+ std::cerr << ss.str() << std::endl; | |
+ } | |
} else { | |
LOG(INFO) << "Ignoring duplicate request to add Fork of OwnerRRef with " | |
<< "RRefId = " << rrefId << ", ForkId = " << forkId; | |
+ { | |
+ std::stringstream ss; | |
+ ss << "(" << getpid() << ") " << __FILE__ << ":" << __LINE__ << " " << rrefId << "\n"; | |
+ std::cerr << ss.str() << std::endl; | |
+ } | |
} | |
} | |
@@ -761,6 +809,11 @@ c10::intrusive_ptr<RRef> RRefContext::delForkOfOwner( | |
// statements to ensure this function is idempotent. This makes it safe to | |
// retry RRefUserDelete messages. | |
{ | |
+ { | |
+ std::stringstream ss; | |
+ ss << "(" << getpid() << ") " << __FILE__ << ":" << __LINE__ << " " << rrefId << " " << forkId << "\n"; | |
+ std::cerr << ss.str() << std::endl; | |
+ } | |
std::lock_guard<std::mutex> lock(mutex_); | |
auto rrefIter = forks_.find(rrefId); | |
if (rrefIter != forks_.end()) { | |
diff --git a/torch/csrc/distributed/rpc/rref_impl.cpp b/torch/csrc/distributed/rpc/rref_impl.cpp | |
index c8b98f8cea..012a614ee3 100644 | |
--- a/torch/csrc/distributed/rpc/rref_impl.cpp | |
+++ b/torch/csrc/distributed/rpc/rref_impl.cpp | |
@@ -10,6 +10,11 @@ | |
#include <torch/csrc/distributed/rpc/rref_proto.h> | |
#include <torch/csrc/distributed/rpc/utils.h> | |
+#include <thread> | |
+#include <iostream> | |
+#include <unistd.h> | |
+#include <sstream> | |
+ | |
namespace { | |
// If the type is subtype of named type, return its qualifiedname, otherwise | |
// return its type str. | |
@@ -248,6 +253,9 @@ OwnerRRef::OwnerRRef( | |
std::vector<c10::Device> devices) | |
: RRef(ownerId, rrefId, type) { | |
future_ = c10::make_intrusive<JitFuture>(type_, std::move(devices)); | |
+ std::stringstream ss; | |
+ ss << "(" << getpid() << ") Instantiating OwnerRRef " << rrefId << " with future " << future_.get(); | |
+ std::cerr << ss.str() << std::endl; | |
if (value.has_value()) { | |
future_->markCompleted(value.value()); | |
@@ -255,6 +263,9 @@ OwnerRRef::OwnerRRef( | |
} | |
const IValue& OwnerRRef::getValue() const { | |
+ std::stringstream ss; | |
+ ss << "(" << getpid() << ") Waiting on OwnerRRef " << rrefId() << " with future " << future_.get(); | |
+ std::cerr << ss.str() << std::endl; | |
TORCH_CHECK( | |
!getTimedOut(), | |
"RRef creation via rpc.remote() timed out, and it " | |
@@ -272,6 +283,9 @@ c10::intrusive_ptr<JitFuture> OwnerRRef::getFuture() { | |
} | |
void OwnerRRef::setValue(IValue&& value) { | |
+ std::stringstream ss; | |
+ ss << "(" << getpid() << ") Populating OwnerRRef " << rrefId() << " with future " << future_.get(); | |
+ std::cerr << ss.str() << std::endl; | |
future_->markCompleted(value); | |
} | |
diff --git a/torch/csrc/distributed/rpc/rref_impl.h b/torch/csrc/distributed/rpc/rref_impl.h | |
index 2f6975d854..a1d65c69d2 100644 | |
--- a/torch/csrc/distributed/rpc/rref_impl.h | |
+++ b/torch/csrc/distributed/rpc/rref_impl.h | |
@@ -9,6 +9,9 @@ | |
#include <torch/csrc/distributed/rpc/types.h> | |
#include <atomic> | |
+#include <sstream> | |
+#include <iostream> | |
+#include <unistd.h> | |
namespace torch { | |
namespace distributed { | |
@@ -395,6 +398,12 @@ class TORCH_API OwnerRRef final : public RRef { | |
// Gets a future that is satisfied when the value or error is set. | |
c10::intrusive_ptr<JitFuture> getFuture(); | |
+ ~OwnerRRef() override { | |
+ std::stringstream ss; | |
+ ss << "(" << getpid() << ") ^^^^ Destructing OwnerRRef (created_on=" << rrefId().createdOn_ << ", local_id=" << rrefId().localId_ << ")"; | |
+ std::cerr << ss.str() << std::endl; | |
+ } | |
+ | |
private: | |
friend class RRefContext; | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment