Skip to content

Instantly share code, notes, and snippets.

@jamesr66a
Created February 23, 2022 21:53
Show Gist options
  • Save jamesr66a/8fdc91eed32c1cac8b18918cfcc16c61 to your computer and use it in GitHub Desktop.
Save jamesr66a/8fdc91eed32c1cac8b18918cfcc16c61 to your computer and use it in GitHub Desktop.
diff --git a/torch/csrc/distributed/rpc/rref_context.cpp b/torch/csrc/distributed/rpc/rref_context.cpp
index 004e9422be..28b2bbc5c2 100644
--- a/torch/csrc/distributed/rpc/rref_context.cpp
+++ b/torch/csrc/distributed/rpc/rref_context.cpp
@@ -4,6 +4,9 @@
#include <sstream>
+#include <iostream>
+#include <unistd.h>
+
namespace torch {
namespace distributed {
namespace rpc {
@@ -101,6 +104,11 @@ std::vector<c10::intrusive_ptr<RRef>> RRefContext::destroyInstance(
deletedRRefs.emplace_back(std::move(rref));
}
}
+ {
+ std::stringstream ss;
+ ss << "(" << getpid() << ") " << __FILE__ << ":" << __LINE__ << "\n";
+ std::cerr << ss.str() << std::endl;
+ }
ctx.owners_.clear();
ctx.pendingOwners_.clear();
return deletedRRefs;
@@ -281,6 +289,11 @@ void RRefContext::delAllUsersAndUnforkedOwners(
TORCH_CHECK(
iter != owners_.end(),
c10::str("Did not find OwnerRRef with RRefId: ", rrefId));
+ {
+ std::stringstream ss;
+ ss << "(" << getpid() << ") " << __FILE__ << ":" << __LINE__ << " " << rrefId << "\n";
+ std::cerr << ss.str() << std::endl;
+ }
owners_.erase(iter);
}
}
@@ -315,6 +328,11 @@ c10::intrusive_ptr<OwnerRRef> RRefContext::getOrCreateOwnerRRef(
std::lock_guard<std::mutex> lock(mutex_);
const auto iter = owners_.find(rrefId);
if (iter == owners_.end()) {
+ if (rrefId.localId_ == 30) {
+ std::stringstream ss;
+ ss << "(" << getpid() << ") ^^^^ Scenario 1 (created_on=" << rrefId.createdOn_ << ", local_id=" << rrefId.localId_ << ")";
+ std::cerr << ss.str() << std::endl;
+ }
// Scenario (1) the first time this owner knows about this RRef
//
// NB: cannot use make_shared here as the constructor of OwnerRRef is
@@ -331,6 +349,11 @@ c10::intrusive_ptr<OwnerRRef> RRefContext::getOrCreateOwnerRRef(
}
return rref;
} else {
+ if (rrefId.localId_ == 30) {
+ std::stringstream ss;
+ ss << "(" << getpid() << ") ^^^^ Scenario 2 (created_on=" << rrefId.createdOn_ << ", local_id=" << rrefId.localId_ << ")";
+ std::cerr << ss.str() << std::endl;
+ }
// Scenario (2) retrieving an existing RRef
auto ownerRRef = fromRRefInterface(iter->second);
// Now double check if the two types match
@@ -446,6 +469,11 @@ RRefForkData RRefContext::prepareChildFork(
// this is needed for OwnerRRefs that were created locally.
{
std::lock_guard<std::mutex> lock(mutex_);
+ {
+ std::stringstream ss;
+ ss << "(" << getpid() << ") " << __FILE__ << ":" << __LINE__ << " " << rref->rrefId() << "\n";
+ std::cerr << ss.str() << std::endl;
+ }
owners_[rref->rrefId()] = rref;
}
} else {
@@ -714,6 +742,11 @@ void RRefContext::finishForkRequest(const ForkId& forkId, worker_id_t parent) {
void RRefContext::addSelfAsFork(c10::intrusive_ptr<OwnerRRef>& rref) {
std::lock_guard<std::mutex> lock(mutex_);
const auto& rrefId = rref->rrefId();
+ {
+ std::stringstream ss;
+ ss << "(" << getpid() << ") " << __FILE__ << ":" << __LINE__ << " " << rrefId << "\n";
+ std::cerr << ss.str() << std::endl;
+ }
owners_[rrefId] = rref;
auto& rrefForks = forks_[rrefId];
TORCH_INTERNAL_ASSERT(
@@ -731,6 +764,11 @@ void RRefContext::addForkOfOwner(const RRefId& rrefId, const ForkId& forkId) {
"Got fork notification twice on the same RRef ",
forkId);
rrefForks.insert(forkId);
+ {
+ std::stringstream ss;
+ ss << "(" << getpid() << ") " << __FILE__ << ":" << __LINE__ << " " << rrefId << " " << forkId << "\n";
+ std::cerr << ss.str() << std::endl;
+ }
}
void RRefContext::addForkOfOwnerIfNotPresent(
@@ -744,9 +782,19 @@ void RRefContext::addForkOfOwnerIfNotPresent(
// function are idempotent.
if (rrefForks.find(forkId) == rrefForks.end()) {
rrefForks.insert(forkId);
+ {
+ std::stringstream ss;
+ ss << "(" << getpid() << ") " << __FILE__ << ":" << __LINE__ << " " << rrefId << " " << forkId << "\n";
+ std::cerr << ss.str() << std::endl;
+ }
} else {
LOG(INFO) << "Ignoring duplicate request to add Fork of OwnerRRef with "
<< "RRefId = " << rrefId << ", ForkId = " << forkId;
+ {
+ std::stringstream ss;
+ ss << "(" << getpid() << ") " << __FILE__ << ":" << __LINE__ << " " << rrefId << "\n";
+ std::cerr << ss.str() << std::endl;
+ }
}
}
@@ -761,6 +809,11 @@ c10::intrusive_ptr<RRef> RRefContext::delForkOfOwner(
// statements to ensure this function is idempotent. This makes it safe to
// retry RRefUserDelete messages.
{
+ {
+ std::stringstream ss;
+ ss << "(" << getpid() << ") " << __FILE__ << ":" << __LINE__ << " " << rrefId << " " << forkId << "\n";
+ std::cerr << ss.str() << std::endl;
+ }
std::lock_guard<std::mutex> lock(mutex_);
auto rrefIter = forks_.find(rrefId);
if (rrefIter != forks_.end()) {
diff --git a/torch/csrc/distributed/rpc/rref_impl.cpp b/torch/csrc/distributed/rpc/rref_impl.cpp
index c8b98f8cea..012a614ee3 100644
--- a/torch/csrc/distributed/rpc/rref_impl.cpp
+++ b/torch/csrc/distributed/rpc/rref_impl.cpp
@@ -10,6 +10,11 @@
#include <torch/csrc/distributed/rpc/rref_proto.h>
#include <torch/csrc/distributed/rpc/utils.h>
+#include <thread>
+#include <iostream>
+#include <unistd.h>
+#include <sstream>
+
namespace {
// If the type is subtype of named type, return its qualifiedname, otherwise
// return its type str.
@@ -248,6 +253,9 @@ OwnerRRef::OwnerRRef(
std::vector<c10::Device> devices)
: RRef(ownerId, rrefId, type) {
future_ = c10::make_intrusive<JitFuture>(type_, std::move(devices));
+ std::stringstream ss;
+ ss << "(" << getpid() << ") Instantiating OwnerRRef " << rrefId << " with future " << future_.get();
+ std::cerr << ss.str() << std::endl;
if (value.has_value()) {
future_->markCompleted(value.value());
@@ -255,6 +263,9 @@ OwnerRRef::OwnerRRef(
}
const IValue& OwnerRRef::getValue() const {
+ std::stringstream ss;
+ ss << "(" << getpid() << ") Waiting on OwnerRRef " << rrefId() << " with future " << future_.get();
+ std::cerr << ss.str() << std::endl;
TORCH_CHECK(
!getTimedOut(),
"RRef creation via rpc.remote() timed out, and it "
@@ -272,6 +283,9 @@ c10::intrusive_ptr<JitFuture> OwnerRRef::getFuture() {
}
void OwnerRRef::setValue(IValue&& value) {
+ std::stringstream ss;
+ ss << "(" << getpid() << ") Populating OwnerRRef " << rrefId() << " with future " << future_.get();
+ std::cerr << ss.str() << std::endl;
future_->markCompleted(value);
}
diff --git a/torch/csrc/distributed/rpc/rref_impl.h b/torch/csrc/distributed/rpc/rref_impl.h
index 2f6975d854..a1d65c69d2 100644
--- a/torch/csrc/distributed/rpc/rref_impl.h
+++ b/torch/csrc/distributed/rpc/rref_impl.h
@@ -9,6 +9,9 @@
#include <torch/csrc/distributed/rpc/types.h>
#include <atomic>
+#include <sstream>
+#include <iostream>
+#include <unistd.h>
namespace torch {
namespace distributed {
@@ -395,6 +398,12 @@ class TORCH_API OwnerRRef final : public RRef {
// Gets a future that is satisfied when the value or error is set.
c10::intrusive_ptr<JitFuture> getFuture();
+ ~OwnerRRef() override {
+ std::stringstream ss;
+ ss << "(" << getpid() << ") ^^^^ Destructing OwnerRRef (created_on=" << rrefId().createdOn_ << ", local_id=" << rrefId().localId_ << ")";
+ std::cerr << ss.str() << std::endl;
+ }
+
private:
friend class RRefContext;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment