Skip to content

Instantly share code, notes, and snippets.

@emrekasg
Last active November 2, 2023 00:16
Show Gist options
  • Save emrekasg/d0d677c47dc6815c1b0f00fd1894c0cd to your computer and use it in GitHub Desktop.
Save emrekasg/d0d677c47dc6815c1b0f00fd1894c0cd to your computer and use it in GitHub Desktop.
package cpu
import (
"runtime"
"syscall"
)
/*
#define _GNU_SOURCE
#include <sched.h>
#include <pthread.h>
void lock_thread(int cpuid) {
pthread_t tid;
cpu_set_t cpuset;
tid = pthread_self();
CPU_ZERO(&cpuset);
CPU_SET(cpuid, &cpuset);
// https://linux.die.net/man/3/pthread_setaffinity_np
pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset);
}
*/
import (
"C"
)
type Task func()
type ShardManager struct {
shards []*Shard
}
type Shard struct {
id int
taskQueue chan Task
totalGoroutines int
kernelThreads []KernelThread
}
type KernelThread struct {
id int
goroutineCount int
}
func NewShardManager() *ShardManager {
cpuCount := runtime.NumCPU()
manager := &ShardManager{
shards: make([]*Shard, cpuCount),
}
for i := 0; i < cpuCount; i++ {
manager.shards[i] = newShard(i)
}
return manager
}
func newShard(id int) *Shard {
shard := &Shard{
id: id,
taskQueue: make(chan Task),
kernelThreads: make([]KernelThread, 0),
}
go func() {
// locks the calling goroutine to its current operating system thread.
// with this way, we can't run any other goroutine on this thread.
runtime.LockOSThread()
// lock the thread to the given cpu core
C.lock_thread(C.int(id))
for task := range shard.taskQueue {
go func() {
// We're locking the thread again because we're creating a new goroutine
// and scheduler can run this goroutine on the different core.
// since we don't want to run it on the different core,
// lock_thread will move the thread to the given core.
C.lock_thread(C.int(id))
// gettid() returns the caller's thread ID (TID).
// https://man7.org/linux/man-pages/man2/gettid.2.html
kernelThread := syscall.Gettid()
shard.registerKernelThread(kernelThread)
task()
}()
}
}()
return shard
}
func (s *Shard) registerKernelThread(threadId int) {
for i, kernelThread := range s.kernelThreads {
if kernelThread.id == threadId {
kernelThread.goroutineCount++
s.kernelThreads[i] = kernelThread
return
}
}
s.kernelThreads = append(s.kernelThreads, KernelThread{
id: threadId,
goroutineCount: 1,
})
}
// sched_getcpu() returns the number of the CPU on which the calling thread is currently executing.
// https://man7.org/linux/man-pages/man3/sched_getcpu.3.html
func GetCpuId() int {
cpu := C.sched_getcpu()
return int(cpu)
}
func (cm *ShardManager) GetGoRoutineCount() int {
count := 0
for _, core := range cm.shards {
count += core.totalGoroutines
}
return count
}
func (cm *ShardManager) GetGoRoutineCountByCpu(cpu int) int {
return cm.shards[cpu].totalGoroutines
}
func (cm *ShardManager) RunTask(cpu int, task Task) {
cm.shards[cpu].totalGoroutines++
cm.shards[cpu].taskQueue <- task
}
func (cm *ShardManager) RunTaskOnAllShards(task Task) {
for _, shard := range cm.shards {
shard.totalGoroutines++
shard.taskQueue <- task
}
}
type CoreInfo struct {
CpuId int
KernelThreads []KernelThread
}
func (cm *ShardManager) GetCoreInfo() []CoreInfo {
coreInfo := make([]CoreInfo, 0)
for _, shard := range cm.shards {
coreInfo = append(coreInfo, CoreInfo{
CpuId: shard.id,
KernelThreads: shard.kernelThreads,
})
}
return coreInfo
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment