Last active
November 2, 2023 00:16
-
-
Save emrekasg/d0d677c47dc6815c1b0f00fd1894c0cd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package cpu | |
import ( | |
"runtime" | |
"syscall" | |
) | |
/* | |
#define _GNU_SOURCE | |
#include <sched.h> | |
#include <pthread.h> | |
void lock_thread(int cpuid) { | |
pthread_t tid; | |
cpu_set_t cpuset; | |
tid = pthread_self(); | |
CPU_ZERO(&cpuset); | |
CPU_SET(cpuid, &cpuset); | |
// https://linux.die.net/man/3/pthread_setaffinity_np | |
pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset); | |
} | |
*/ | |
import ( | |
"C" | |
) | |
type Task func() | |
type ShardManager struct { | |
shards []*Shard | |
} | |
type Shard struct { | |
id int | |
taskQueue chan Task | |
totalGoroutines int | |
kernelThreads []KernelThread | |
} | |
type KernelThread struct { | |
id int | |
goroutineCount int | |
} | |
func NewShardManager() *ShardManager { | |
cpuCount := runtime.NumCPU() | |
manager := &ShardManager{ | |
shards: make([]*Shard, cpuCount), | |
} | |
for i := 0; i < cpuCount; i++ { | |
manager.shards[i] = newShard(i) | |
} | |
return manager | |
} | |
func newShard(id int) *Shard { | |
shard := &Shard{ | |
id: id, | |
taskQueue: make(chan Task), | |
kernelThreads: make([]KernelThread, 0), | |
} | |
go func() { | |
// locks the calling goroutine to its current operating system thread. | |
// with this way, we can't run any other goroutine on this thread. | |
runtime.LockOSThread() | |
// lock the thread to the given cpu core | |
C.lock_thread(C.int(id)) | |
for task := range shard.taskQueue { | |
go func() { | |
// We're locking the thread again because we're creating a new goroutine | |
// and scheduler can run this goroutine on the different core. | |
// since we don't want to run it on the different core, | |
// lock_thread will move the thread to the given core. | |
C.lock_thread(C.int(id)) | |
// gettid() returns the caller's thread ID (TID). | |
// https://man7.org/linux/man-pages/man2/gettid.2.html | |
kernelThread := syscall.Gettid() | |
shard.registerKernelThread(kernelThread) | |
task() | |
}() | |
} | |
}() | |
return shard | |
} | |
func (s *Shard) registerKernelThread(threadId int) { | |
for i, kernelThread := range s.kernelThreads { | |
if kernelThread.id == threadId { | |
kernelThread.goroutineCount++ | |
s.kernelThreads[i] = kernelThread | |
return | |
} | |
} | |
s.kernelThreads = append(s.kernelThreads, KernelThread{ | |
id: threadId, | |
goroutineCount: 1, | |
}) | |
} | |
// sched_getcpu() returns the number of the CPU on which the calling thread is currently executing. | |
// https://man7.org/linux/man-pages/man3/sched_getcpu.3.html | |
func GetCpuId() int { | |
cpu := C.sched_getcpu() | |
return int(cpu) | |
} | |
func (cm *ShardManager) GetGoRoutineCount() int { | |
count := 0 | |
for _, core := range cm.shards { | |
count += core.totalGoroutines | |
} | |
return count | |
} | |
func (cm *ShardManager) GetGoRoutineCountByCpu(cpu int) int { | |
return cm.shards[cpu].totalGoroutines | |
} | |
func (cm *ShardManager) RunTask(cpu int, task Task) { | |
cm.shards[cpu].totalGoroutines++ | |
cm.shards[cpu].taskQueue <- task | |
} | |
func (cm *ShardManager) RunTaskOnAllShards(task Task) { | |
for _, shard := range cm.shards { | |
shard.totalGoroutines++ | |
shard.taskQueue <- task | |
} | |
} | |
type CoreInfo struct { | |
CpuId int | |
KernelThreads []KernelThread | |
} | |
func (cm *ShardManager) GetCoreInfo() []CoreInfo { | |
coreInfo := make([]CoreInfo, 0) | |
for _, shard := range cm.shards { | |
coreInfo = append(coreInfo, CoreInfo{ | |
CpuId: shard.id, | |
KernelThreads: shard.kernelThreads, | |
}) | |
} | |
return coreInfo | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment