Skip to content

Instantly share code, notes, and snippets.

@hodzanassredin
Last active September 26, 2024 10:37
Show Gist options
  • Save hodzanassredin/cb38e5013377a3b10dfdd40699e296db to your computer and use it in GitHub Desktop.
Save hodzanassredin/cb38e5013377a3b10dfdd40699e296db to your computer and use it in GitHub Desktop.
package kafka.server
import (
"context"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
kafkago "github.com/Shopify/sarama"
)
type TierStateMachine struct {
leader LeaderEndPoint
replicaMgr ReplicaManager
useFutureLog bool
}
func NewTierStateMachine(leader LeaderEndPoint, replicaMgr ReplicaManager, useFutureLog bool) *TierStateMachine {
return &TierStateMachine{
leader: leader,
replicaMgr: replicaMgr,
useFutureLog: useFutureLog,
}
}
type PartitionFetchState struct {
topicId string
offsetToFetch int64
initialLag int64
currentLeaderEpoch int32
unifiedLog UnifiedLog
}
func (t *TierStateMachine) Start(topicPartition TopicPartition, currentFetchState PartitionFetchState, fetchPartitionData PartitionData) (*PartitionFetchState, error) {
epochAndLeaderLocalStartOffset := t.leader.FetchEarliestLocalOffset(context.Background(), topicPartition, int32(currentFetchState.currentLeaderEpoch))
epoch := epochAndLeaderLocalStartOffset.LeaderEpoch
leaderLocalStartOffset := epochAndLeaderLocalStartOffset.Offset
offsetToFetch := t.buildRemoteLogAuxState(topicPartition, int32(currentFetchState.currentLeaderEpoch), leaderLocalStartOffset, epoch, fetchPartitionData.LogStartOffset, t.replicaMgr.GetUnifiedLogOrException(topicPartition))
fetchLatestOffsetResult := t.leader.FetchLatestOffset(context.Background(), topicPartition, int32(currentFetchState.currentLeaderEpoch))
leaderEndOffset := fetchLatestOffsetResult.Offset
initialLag := leaderEndOffset - offsetToFetch
return &PartitionFetchState{
topicId: topicPartition.Topic(),
offsetToFetch: offsetToFetch,
initialLag: initialLag,
currentLeaderEpoch: currentFetchState.currentLeaderEpoch,
unifiedLog: t.replicaMgr.GetUnifiedLogOrException(topicPartition),
}, nil
}
func (t *TierStateMachine) buildRemoteLogAuxState(topicPartition TopicPartition, currentLeaderEpoch int32, leaderLocalStartOffset int64, epoch int32, leaderLogStartOffset int64, unifiedLog UnifiedLog) (int64, error) {
if !unifiedLog.RemoteStorageSystemEnable() || !unifiedLog.Config().RemoteStorageEnable() {
return 0, fmt.Errorf("tiered storage is not enabled for partition %s", topicPartition)
}
remoteLogManager := t.replicaMgr.GetRemoteLogManagerOrException()
if remoteLogManager == nil {
return 0, fmt.Errorf("remote log manager is not yet instantiated")
}
remoteLogSegmentMetadata := remoteLogManager.FetchRemoteLogSegmentMetadata(context.Background(), topicPartition, epoch, leaderLocalStartOffset-1)
if remoteLogSegmentMetadata == nil {
return 0, fmt.Errorf("previous remote log segment metadata was not found for partition %s", topicPartition)
}
nextOffset := remoteLogSegmentMetadata.EndOffset + 1
partition := t.replicaMgr.GetPartitionOrException(topicPartition)
partition.TruncateFullyAndStartAt(nextOffset, t.useFutureLog, Option.Apply(leaderLogStartOffset))
unifiedLog.MaybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented)
unifiedLog.MaybeIncrementLocalLogStartOffset(nextOffset, LeaderOffsetIncremented)
epochs := t.readLeaderEpochCheckpoint(remoteLogManager, remoteLogSegmentMetadata)
if unifiedLog.LeaderEpochCache().IsDefined() {
unifiedLog.LeaderEpochCache().Get().Assign(epochs)
}
log.Infof("Updated the epoch cache from remote tier till offset: %d with size: %d for %s", leaderLocalStartOffset, len(epochs), partition)
t.buildProducerSnapshotFile(unifiedLog, nextOffset, remoteLogSegmentMetadata, remoteLogManager)
return nextOffset, nil
}
func (t *TierStateMachine) readLeaderEpochCheckpoint(remoteLogManager RemoteLogManager, remoteLogSegmentMetadata RemoteLogSegmentMetadata) ([]EpochEntry, error) {
indexInputStream := remoteLogManager.StorageManager().FetchIndex(context.Background(), remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LeaderEpoch)
defer indexInputStream.Close()
checkpointFile := new(CheckpointFile)
checkpointFile.Read(indexInputStream)
return checkpointFile.GetCheckpointReadBuffer("leader_epoch_checkpoint", 0, LeaderEpochCheckpointFile.Formatter).Read()
}
func (t *TierStateMachine) buildProducerSnapshotFile(unifiedLog UnifiedLog, nextOffset int64, remoteLogSegmentMetadata RemoteLogSegmentMetadata, remoteLogManager RemoteLogManager) error {
snapshotFile := LogFileUtils.ProducerSnapshotFile(unifiedLog.Dir(), nextOffset)
tmpSnapshotFile := filepath.Join(snapshotFile, ".tmp")
// Copy it to snapshot file in atomic manner.
err := ioutil.WriteFile(tmpSnapshotFile, nil, 0644)
if err != nil {
return err
}
indexInputStream := remoteLogManager.StorageManager().FetchIndex(context.Background(), remoteLogSegmentMetadata, RemoteStorageManager.IndexType.ProducerSnapshot)
defer indexInputStream.Close()
_, err = os.Stat(tmpSnapshotFile)
if err == nil {
err = os.Rename(tmpSnapshotFile, snapshotFile)
}
return err
}
```
Note that I've assumed the following:
* `LeaderEndPoint` and `ReplicaManager` are custom types that implement the necessary interfaces.
* `UnifiedLog` is a custom type that implements the necessary interfaces.
* `RemoteLogManager` is a custom type that implements the necessary interfaces.
* `TopicPartition`, `PartitionData`, and other types are assumed to be defined elsewhere in the codebase.
Also, I've used Go's built-in logging package (`log`) instead of SLF4J. If you need to use SLF4J, you'll need to add it as a dependency and modify the logging statements accordingly.
Please note that this is just an approximation of the original Java code in Go, and there may be differences in behavior or performance due to language-specific features and idioms.
//https://github.com/eclipse/hawkbit/blob/master/hawkbit-rest/hawkbit-mgmt-resource/src/main/java/org/eclipse/hawkbit/mgmt/rest/resource/MgmtTargetResource.java
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"strconv"
"github.com/gorilla/mux"
)
type Target struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Address string `json:"address"`
SecurityToken string `json:"securityToken"`
}
type TargetManagement struct {
targets map[string]Target
}
func NewTargetManagement() *TargetManagement {
return &TargetManagement{
targets: make(map[string]Target),
}
}
func (tm *TargetManagement) GetTarget(id string) (Target, bool) {
target, ok := tm.targets[id]
return target, ok
}
func (tm *TargetManagement) CreateTarget(target Target) {
tm.targets[target.ID] = target
}
func (tm *TargetManagement) UpdateTarget(id string, target Target) {
tm.targets[id] = target
}
func (tm *TargetManagement) DeleteTarget(id string) {
delete(tm.targets, id)
}
func main() {
targetManagement := NewTargetManagement()
router := mux.NewRouter()
router.HandleFunc("/targets/{id}", func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
id := vars["id"]
switch r.Method {
case http.MethodGet:
getTarget(w, r, id, targetManagement)
case http.MethodPut:
updateTarget(w, r, id, targetManagement)
case http.MethodDelete:
deleteTarget(w, r, id, targetManagement)
default:
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
}
}).Methods("GET", "PUT", "DELETE")
router.HandleFunc("/targets", func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost {
createTarget(w, r, targetManagement)
} else {
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
}
}).Methods("POST")
log.Fatal(http.ListenAndServe(":8080", router))
}
func getTarget(w http.ResponseWriter, r *http.Request, id string, tm *TargetManagement) {
target, ok := tm.GetTarget(id)
if !ok {
http.Error(w, "Target not found", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(target)
}
func createTarget(w http.ResponseWriter, r *http.Request, tm *TargetManagement) {
var target Target
err := json.NewDecoder(r.Body).Decode(&target)
if err != nil {
http.Error(w, "Invalid request payload", http.StatusBadRequest)
return
}
tm.CreateTarget(target)
w.WriteHeader(http.StatusCreated)
}
func updateTarget(w http.ResponseWriter, r *http.Request, id string, tm *TargetManagement) {
var target Target
err := json.NewDecoder(r.Body).Decode(&target)
if err != nil {
http.Error(w, "Invalid request payload", http.StatusBadRequest)
return
}
tm.UpdateTarget(id, target)
w.WriteHeader(http.StatusOK)
}
func deleteTarget(w http.ResponseWriter, r *http.Request, id string, tm *TargetManagement) {
tm.DeleteTarget(id)
w.WriteHeader(http.StatusOK)
}
Anton Shelin, [26.09.2024 12:12]
Этот код представляет собой класс TierStateMachine, который является частью Apache Kafka и предназначен для управления состоянием уровней (tiers) в системе хранения данных Kafka. Основная цель этого класса — обрабатывать переходы состояний, связанные с многоуровневым хранением данных, и обеспечивать корректное взаимодействие между локальными и удаленными логами.
Основные компоненты и логика работы:
Лицензия и импорты:
Код начинается с лицензии Apache 2.0.
Импортируются различные классы и утилиты из библиотек Kafka и других зависимостей.
Класс TierStateMachine:
Класс содержит несколько приватных полей, таких как leader, replicaMgr, и useFutureLog, которые инициализируются в конструкторе.
Основной метод start запускает машину состояний для заданного раздела темы (topic partition).
Метод start:
Метод принимает topicPartition, currentFetchState, и fetchPartitionData.
Он выполняет следующие шаги:
Получает самый ранний локальный смещение (offset) лидера для данного раздела темы.
Определяет, какой лог использовать (будущий или локальный).
Построение вспомогательного состояния удаленного лога (remote log aux state) до локального смещения лидера.
Получает последнее смещение лидера.
Вычисляет начальное отставание (lag).
Возвращает новое состояние PartitionFetchState.
Метод fetchEarlierEpochEndOffset:
Этот метод используется для получения конечного смещения для предыдущей эпохи от лидера.
Он создает запрос для получения конечного смещения для эпохи, предшествующей текущей, и обрабатывает ответ.
Метод readLeaderEpochCheckpoint:
Читает контрольную точку (checkpoint) эпохи лидера из удаленного хранилища.
Использует RemoteLogManager для получения индекса и читает его с помощью BufferedReader.
Метод buildProducerSnapshotFile:
Восстанавливает файл снапшота производителя (producer snapshot) из удаленного хранилища.
Копирует файл снапшота в локальное хранилище и перезагружает состояние производителя.
Метод buildRemoteLogAuxState:
Этот метод строит вспомогательное состояние удаленного лога для данного раздела темы.
Он выполняет следующие шаги:
Проверяет, включено ли удаленное хранилище.
Получает RemoteLogManager.
Определяет целевую эпоху для предыдущего смещения.
Получает метаданные сегмента удаленного лога.
Обновляет кэш эпохи лидера и снапшоты производителя.
Возвращает следующее смещение для извлечения данных от лидера.
Логика работы:
Инициализация:
Класс инициализируется с информацией о лидере, менеджере реплик и флагом использования будущего лога.
Запуск машины состояний:
Метод start запускает процесс построения вспомогательного состояния удаленного лога.
Он получает необходимые смещения и эпохи, строит состояние и возвращает новое состояние извлечения данных.
Построение состояния:
Метод buildRemoteLogAuxState выполняет основную работу по построению состояния.
Он взаимодействует с удаленным хранилищем для получения метаданных и обновления локального состояния.
Обработка ошибок:
Код содержит обработку различных ошибок, таких как отсутствие ответа от лидера, ошибки удаленного хранилища и т.д.
Этот класс играет ключевую роль в обеспечении корректного взаимодействия между локальными и удаленными логами в системе Kafka, что позволяет эффективно управлять данными и поддерживать их целостность.
Anton Shelin, [26.09.2024 12:15]
Переписывание сложного Java-кода на Go требует значительных изменений, так как Go имеет свои идиоматические особенности и подходы к программированию. Ниже приведен пример того, как можно переписать основные части класса TierStateMachine на Go. Обратите внимание, что это упрощенный пример, и для полного функционала потребуется дополнительная работа.
Anton Shelin, [26.09.2024 12:15]
Основные изменения и особенности:
Импорты и структура:
В Go импорты и структура кода отличаются от Java. Импорты находятся в начале файла, а структура кода более плоская.
Структуры и методы:
В Go используются структуры (struct) и методы, привязанные к этим структурам.
Конструкторы заменяются функциями, возвращающими новые экземпляры структур.
Обработка ошибок:
В Go ошибки возвращаются как значения, а не выбрасываются как исключения.
Работа с файлами и потоками:
В Go используются пакеты os и io для работы с файлами и потоками.
Логирование:
В Go используется пакет fmt для вывода логов.
Этот пример демонстрирует основные идиоматические подходы Go, такие как использование структур, методов, обработку ошибок и работу с файлами. Для полного функционала потребуется дополнительная работа, включая реализацию всех необходимых интерфейсов и структур данных.
<mxfile host="app.diagrams.net">
<diagram name="TierStateMachine">
<mxGraphModel dx="1280" dy="720" grid="1" gridSize="10" guides="1" tooltips="1" connect="1" arrows="1" fold="1" page="1" pageScale="1" pageWidth="827" pageHeight="1169" math="0" shadow="0">
<root>
<mxCell id="0" />
<mxCell id="1" parent="0" />
<mxCell id="2" value="Start" style="rounded=1;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="20" y="20" width="120" height="60" as="geometry" />
</mxCell>
<mxCell id="3" value="FetchEarliestLocalOffset" style="rounded=1;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="20" y="100" width="180" height="60" as="geometry" />
</mxCell>
<mxCell id="4" value="BuildRemoteLogAuxState" style="rounded=1;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="20" y="180" width="180" height="60" as="geometry" />
</mxCell>
<mxCell id="5" value="FetchLatestOffset" style="rounded=1;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="20" y="260" width="180" height="60" as="geometry" />
</mxCell>
<mxCell id="6" value="CalculateInitialLag" style="rounded=1;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="20" y="340" width="180" height="60" as="geometry" />
</mxCell>
<mxCell id="7" value="ReturnNewPartitionFetchState" style="rounded=1;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="20" y="420" width="180" height="60" as="geometry" />
</mxCell>
<mxCell id="8" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;" edge="1" parent="1" source="2" target="3">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="9" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;" edge="1" parent="1" source="3" target="4">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="10" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;" edge="1" parent="1" source="4" target="5">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="11" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;" edge="1" parent="1" source="5" target="6">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="12" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;" edge="1" parent="1" source="6" target="7">
<mxGeometry relative="1" as="geometry" />
</mxCell>
</root>
</mxGraphModel>
</diagram>
</mxfile>
use log::{info, debug, error};
use std::collections::HashMap;
use std::fs::{File, copy};
use std::io::{BufReader, Read, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::Mutex;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum TierStateMachineError {
#[error("Remote storage exception: {0}")]
RemoteStorageException(String),
#[error("IO error: {0}")]
IoError(#[from] std::io::Error),
#[error("Kafka exception: {0}")]
KafkaException(String),
}
pub struct TierStateMachine {
leader: Arc<LeaderEndPoint>,
replica_mgr: Arc<ReplicaManager>,
use_future_log: bool,
}
impl TierStateMachine {
pub fn new(leader: Arc<LeaderEndPoint>, replica_mgr: Arc<ReplicaManager>, use_future_log: bool) -> Self {
Self {
leader,
replica_mgr,
use_future_log,
}
}
pub async fn start(
&self,
topic_partition: TopicPartition,
current_fetch_state: PartitionFetchState,
fetch_partition_data: PartitionData,
) -> Result<PartitionFetchState, TierStateMachineError> {
let epoch_and_leader_local_start_offset = self.leader.fetch_earliest_local_offset(
&topic_partition,
current_fetch_state.current_leader_epoch(),
).await?;
let epoch = epoch_and_leader_local_start_offset.leader_epoch();
let leader_local_start_offset = epoch_and_leader_local_start_offset.offset();
let unified_log = if self.use_future_log {
self.replica_mgr.future_log_or_exception(&topic_partition).await?
} else {
self.replica_mgr.local_log_or_exception(&topic_partition).await?
};
let offset_to_fetch = self.build_remote_log_aux_state(
&topic_partition,
current_fetch_state.current_leader_epoch(),
leader_local_start_offset,
epoch,
fetch_partition_data.log_start_offset(),
&unified_log,
).await?;
let fetch_latest_offset_result = self.leader.fetch_latest_offset(
&topic_partition,
current_fetch_state.current_leader_epoch(),
).await?;
let leader_end_offset = fetch_latest_offset_result.offset();
let initial_lag = leader_end_offset - offset_to_fetch;
Ok(PartitionFetchState::new(
current_fetch_state.topic_id(),
offset_to_fetch,
Some(initial_lag),
current_fetch_state.current_leader_epoch(),
Fetching,
unified_log.latest_epoch(),
))
}
async fn build_remote_log_aux_state(
&self,
topic_partition: &TopicPartition,
current_leader_epoch: i32,
leader_local_log_start_offset: i64,
epoch_for_leader_local_log_start_offset: i32,
leader_log_start_offset: i64,
unified_log: &UnifiedLog,
) -> Result<i64, TierStateMachineError> {
if !unified_log.remote_storage_system_enable() || !unified_log.config().remote_storage_enable() {
return Err(TierStateMachineError::RemoteStorageException(
format!("Couldn't build the state from remote store for partition {}, as remote log storage is not yet enabled", topic_partition),
));
}
let rlm = self.replica_mgr.remote_log_manager().ok_or_else(|| {
TierStateMachineError::RemoteStorageException("RemoteLogManager is not yet instantiated".into())
})?;
let previous_offset_to_leader_local_log_start_offset = leader_local_log_start_offset - 1;
let target_epoch = if epoch_for_leader_local_log_start_offset == 0 {
epoch_for_leader_local_log_start_offset
} else {
let earlier_epoch_end_offset = self.fetch_earlier_epoch_end_offset(
epoch_for_leader_local_log_start_offset,
topic_partition,
current_leader_epoch,
).await?;
if earlier_epoch_end_offset.end_offset() > previous_offset_to_leader_local_log_start_offset {
earlier_epoch_end_offset.leader_epoch()
} else {
epoch_for_leader_local_log_start_offset
}
};
let remote_log_segment_metadata = rlm.fetch_remote_log_segment_metadata(
topic_partition,
target_epoch,
previous_offset_to_leader_local_log_start_offset,
).await.ok_or_else(|| {
TierStateMachineError::RemoteStorageException(format!(
"Couldn't build the state from remote store for partition: {}, currentLeaderEpoch: {}, leaderLocalLogStartOffset: {}, leaderLogStartOffset: {}, epoch: {}",
topic_partition, current_leader_epoch, leader_local_log_start_offset, leader_log_start_offset, target_epoch
))
})?;
let next_offset = remote_log_segment_metadata.end_offset() + 1;
let partition = self.replica_mgr.get_partition_or_exception(topic_partition).await?;
partition.truncate_fully_and_start_at(next_offset, self.use_future_log, Some(leader_log_start_offset)).await?;
unified_log.maybe_increment_log_start_offset(leader_log_start_offset, LeaderOffsetIncremented).await?;
unified_log.maybe_increment_local_log_start_offset(next_offset, LeaderOffsetIncremented).await?;
let epochs = self.read_leader_epoch_checkpoint(&rlm, &remote_log_segment_metadata).await?;
if let Some(leader_epoch_cache) = unified_log.leader_epoch_cache() {
leader_epoch_cache.assign(epochs);
}
info!(
"Updated the epoch cache from remote tier till offset: {} with size: {} for {}",
leader_local_log_start_offset, epochs.len(), partition
);
self.build_producer_snapshot_file(unified_log, next_offset, &remote_log_segment_metadata, &rlm).await?;
debug!(
"Built the leader epoch cache and producer snapshots from remote tier for {}, with active producers size: {}, leaderLogStartOffset: {}, and logEndOffset: {}",
partition, unified_log.producer_state_manager().active_producers().len(), leader_log_start_offset, next_offset
);
Ok(next_offset)
}
async fn fetch_earlier_epoch_end_offset(
&self,
epoch: i32,
partition: &TopicPartition,
current_leader_epoch: i32,
) -> Result<EpochEndOffset, TierStateMachineError> {
let previous_epoch = epoch - 1;
let mut partitions_with_epochs = HashMap::new();
partitions_with_epochs.insert(
partition.clone(),
OffsetForLeaderPartition {
partition: partition.partition(),
current_leader_epoch,
leader_epoch: previous_epoch,
},
);
let maybe_epoch_end_offset = self.leader.fetch_epoch_end_offsets(partitions_with_epochs).await?;
let epoch_end_offset = maybe_epoch_end_offset.get(partition).ok_or_else(|| {
TierStateMachineError::KafkaException(format!("No response received for partition: {}", partition))
})?;
if epoch_end_offset.error_code() != Errors::NONE.code() {
return Err(TierStateMachineError::KafkaException(format!(
"Error code: {}",
epoch_end_offset.error_code()
)));
}
Ok(epoch_end_offset.clone())
}
async fn read_leader_epoch_checkpoint(
&self,
rlm: &RemoteLogManager,
remote_log_segment_metadata: &RemoteLogSegmentMetadata,
) -> Result<Vec<EpochEntry>, TierStateMachineError> {
let input_stream = rlm.storage_manager().fetch_index(remote_log_segment_metadata, RemoteStorageManager::IndexType::LEADER_EPOCH).await?;
let mut buf_reader = BufReader::new(input_stream);
let mut contents = String::new();
buf_reader.read_to_string(&mut contents)?;
let read_buffer = CheckpointReadBuffer::new("", &contents, 0, LeaderEpochCheckpointFile::FORMATTER);
Ok(read_buffer.read()?)
}
async fn build_producer_snapshot_file(
&self,
unified_log: &UnifiedLog,
next_offset: i64,
remote_log_segment_metadata: &RemoteLogSegmentMetadata,
rlm: &RemoteLogManager,
) -> Result<(), TierStateMachineError> {
let snapshot_file = LogFileUtils::producer_snapshot_file(unified_log.dir(), next_offset);
let tmp_snapshot_file = PathBuf::from(snapshot_file.as_path().to_str().unwrap()).with_extension("tmp");
let mut input_stream = rlm.storage_manager().fetch_index(remote_log_segment_metadata, RemoteStorageManager::IndexType::PRODUCER_SNAPSHOT).await?;
let mut tmp_file = File::create(&tmp_snapshot_file)?;
std::io::copy(&mut input_stream, &mut tmp_file)?;
std::fs::rename(tmp_snapshot_file, snapshot_file)?;
unified_log.producer_state_manager().truncate_fully_and_reload_snapshots().await?;
unified_log.load_producer_state(next_offset).await?;
Ok(())
}
}
// Placeholder structs and enums
struct LeaderEndPoint;
struct ReplicaManager;
struct TopicPartition;
struct PartitionFetchState;
struct PartitionData;
struct UnifiedLog;
struct OffsetForLeaderPartition;
struct EpochEndOffset;
struct RemoteLogManager;
struct RemoteLogSegmentMetadata;
struct CheckpointReadBuffer;
struct LeaderEpochCheckpointFile;
struct LogFileUtils;
struct Errors;
struct Fetching;
struct LeaderOffsetIncremented;
impl LeaderEndPoint {
async fn fetch_earliest_local_offset(&self, _topic_partition: &TopicPartition, _current_leader_epoch: i32) -> Result<OffsetAndEpoch, TierStateMachineError> {
Ok(OffsetAndEpoch { leader_epoch: 0, offset: 0 })
}
async fn fetch_latest_offset(&self, _topic_partition: &TopicPartition, _current_leader_epoch: i32) -> Result<OffsetAndEpoch, TierStateMachineError> {
Ok(OffsetAndEpoch { leader_epoch: 0, offset: 0 })
}
async fn fetch_epoch_end_offsets(&self, _partitions_with_epochs: HashMap<TopicPartition, OffsetForLeaderPartition>) -> Result<HashMap<TopicPartition, EpochEndOffset>, TierStateMachineError> {
Ok(HashMap::new())
}
}
impl ReplicaManager {
async fn future_log_or_exception(&self, _topic_partition: &TopicPartition) -> Result<UnifiedLog, TierStateMachineError> {
Ok(UnifiedLog)
}
async fn local_log_or_exception(&self, _topic_partition: &TopicPartition) -> Result<UnifiedLog, TierStateMachineError> {
Ok(UnifiedLog)
}
fn remote_log_manager(&self) -> Option<RemoteLogManager> {
Some(RemoteLogManager)
}
async fn get_partition_or_exception(&self, _topic_partition: &TopicPartition) -> Result<Partition, TierStateMachineError> {
Ok(Partition)
}
}
impl UnifiedLog {
fn remote_storage_system_enable(&self) -> bool {
true
}
fn config(&self) -> Config {
Config
}
async fn maybe_increment_log_start_offset(&self, _offset: i64, _reason: LeaderOffsetIncremented) -> Result<(), TierStateMachineError> {
Ok(())
}
async fn maybe_increment_local_log_start_offset(&self, _offset: i64, _reason: LeaderOffsetIncremented) -> Result<(), TierStateMachineError> {
Ok(())
}
fn leader_epoch_cache(&self) -> Option<LeaderEpochCache> {
Some(LeaderEpochCache)
}
fn producer_state_manager(&self) -> ProducerStateManager {
ProducerStateManager
}
async fn load_producer_state(&self, _offset: i64) -> Result<(), TierStateMachineError> {
Ok(())
}
fn latest_epoch(&self) -> i32 {
0
}
fn dir(&self) -> &str {
""
}
}
impl RemoteLogManager {
fn storage_manager(&self) -> RemoteStorageManager {
RemoteStorageManager
}
async fn fetch_remote_log_segment_metadata(&self, _topic_partition: &TopicPartition, _target_epoch: i32, _previous_offset: i64) -> Result<Option<RemoteLogSegmentMetadata>, TierStateMachineError> {
Ok(Some(RemoteLogSegmentMetadata))
}
}
impl RemoteStorageManager {
async fn fetch_index(&self, _remote_log_segment_metadata: &RemoteLogSegmentMetadata, _index_type: IndexType) -> Result<File, TierStateMachineError> {
Ok(File::open("path/to/file")?)
}
}
impl Partition {
async fn truncate_fully_and_start_at(&self, _offset: i64, _use_future_log: bool, _leader_log_start_offset: Option<i64>) -> Result<(), TierStateMachineError> {
Ok(())
}
}
impl ProducerStateManager {
async fn truncate_fully_and_reload_snapshots(&self) -> Result<(), TierStateMachineError> {
Ok(())
}
fn active_producers(&self) -> Vec<Producer> {
vec![]
}
}
impl CheckpointReadBuffer {
fn new(_name: &str, _contents: &str, _offset: usize, _formatter: Formatter) -> Self {
Self
}
fn read(&self) -> Result<Vec<EpochEntry>, TierStateMachineError> {
Ok(vec![])
}
}
impl LeaderEpochCheckpointFile {
const FORMATTER: Formatter = Formatter;
}
impl LogFileUtils {
fn producer_snapshot_file(_dir: &str, _offset: i64) -> PathBuf {
PathBuf::new()
}
}
impl Errors {
const NONE: ErrorCode = ErrorCode { code: 0 };
}
struct OffsetAndEpoch {
leader_epoch: i32,
offset: i64,
}
struct Config;
struct LeaderEpochCache;
struct ProducerStateManager;
struct Producer;
struct Formatter;
struct ErrorCode {
code: i32,
}
struct IndexType;
struct Partition;
#[tokio::main]
async fn main() {
// Example usage
}
package kafka
import (
"errors"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"github.com/apache/kafka/common"
"github.com/apache/kafka/log"
"github.com/apache/kafka/storage"
)
// TierStateMachine manages the state transitions related to tiering in Kafka.
type TierStateMachine struct {
leader LeaderEndPoint
replicaMgr ReplicaManager
useFutureLog bool
}
// NewTierStateMachine creates a new TierStateMachine instance.
func NewTierStateMachine(leader LeaderEndPoint, replicaMgr ReplicaManager, useFutureLog bool) *TierStateMachine {
return &TierStateMachine{
leader: leader,
replicaMgr: replicaMgr,
useFutureLog: useFutureLog,
}
}
// Start starts the tier state machine for the provided topic partition.
func (tsm *TierStateMachine) Start(topicPartition common.TopicPartition, currentFetchState PartitionFetchState, fetchPartitionData PartitionData) (PartitionFetchState, error) {
epochAndLeaderLocalStartOffset, err := tsm.leader.FetchEarliestLocalOffset(topicPartition, currentFetchState.CurrentLeaderEpoch())
if err != nil {
return PartitionFetchState{}, err
}
epoch := epochAndLeaderLocalStartOffset.LeaderEpoch()
leaderLocalStartOffset := epochAndLeaderLocalStartOffset.Offset()
var unifiedLog log.UnifiedLog
if tsm.useFutureLog {
unifiedLog, err = tsm.replicaMgr.FutureLogOrException(topicPartition)
} else {
unifiedLog, err = tsm.replicaMgr.LocalLogOrException(topicPartition)
}
if err != nil {
return PartitionFetchState{}, err
}
offsetToFetch, err := tsm.buildRemoteLogAuxState(topicPartition, currentFetchState.CurrentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.LogStartOffset(), unifiedLog)
if err != nil {
return PartitionFetchState{}, err
}
fetchLatestOffsetResult, err := tsm.leader.FetchLatestOffset(topicPartition, currentFetchState.CurrentLeaderEpoch())
if err != nil {
return PartitionFetchState{}, err
}
leaderEndOffset := fetchLatestOffsetResult.Offset()
initialLag := leaderEndOffset - offsetToFetch
return PartitionFetchState{
TopicID: currentFetchState.TopicID(),
OffsetToFetch: offsetToFetch,
InitialLag: initialLag,
CurrentLeaderEpoch: currentFetchState.CurrentLeaderEpoch(),
Fetching: true,
UnifiedLog: unifiedLog,
LatestEpoch: unifiedLog.LatestEpoch(),
}, nil
}
// buildRemoteLogAuxState builds the required state for this partition from leader and remote storage.
func (tsm *TierStateMachine) buildRemoteLogAuxState(topicPartition common.TopicPartition, currentLeaderEpoch int, leaderLocalLogStartOffset int64, epochForLeaderLocalLogStartOffset int, leaderLogStartOffset int64, unifiedLog log.UnifiedLog) (int64, error) {
if !unifiedLog.RemoteStorageSystemEnable() || !unifiedLog.Config().RemoteStorageEnable() {
return 0, errors.New("remote log storage is not yet enabled")
}
rlm := tsm.replicaMgr.RemoteLogManager()
if rlm == nil {
return 0, errors.New("RemoteLogManager is not yet instantiated")
}
previousOffsetToLeaderLocalLogStartOffset := leaderLocalLogStartOffset - 1
targetEpoch := epochForLeaderLocalLogStartOffset
if epochForLeaderLocalLogStartOffset != 0 {
earlierEpochEndOffset, err := tsm.fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset, topicPartition, currentLeaderEpoch)
if err != nil {
return 0, err
}
if earlierEpochEndOffset.EndOffset() > previousOffsetToLeaderLocalLogStartOffset {
targetEpoch = earlierEpochEndOffset.LeaderEpoch()
}
}
remoteLogSegmentMetadata, err := rlm.FetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset)
if err != nil {
return 0, err
}
nextOffset := remoteLogSegmentMetadata.EndOffset() + 1
partition := tsm.replicaMgr.GetPartitionOrException(topicPartition)
partition.TruncateFullyAndStartAt(nextOffset, tsm.useFutureLog, leaderLogStartOffset)
unifiedLog.MaybeIncrementLogStartOffset(leaderLogStartOffset, log.LeaderOffsetIncremented)
unifiedLog.MaybeIncrementLocalLogStartOffset(nextOffset, log.LeaderOffsetIncremented)
epochs, err := tsm.readLeaderEpochCheckpoint(rlm, remoteLogSegmentMetadata)
if err != nil {
return 0, err
}
if unifiedLog.LeaderEpochCache().IsDefined() {
unifiedLog.LeaderEpochCache().Get().Assign(epochs)
}
fmt.Printf("Updated the epoch cache from remote tier till offset: %d with size: %d for %v\n", leaderLocalLogStartOffset, len(epochs), partition)
err = tsm.buildProducerSnapshotFile(unifiedLog, nextOffset, remoteLogSegmentMetadata, rlm)
if err != nil {
return 0, err
}
fmt.Printf("Built the leader epoch cache and producer snapshots from remote tier for %v, with active producers size: %d, leaderLogStartOffset: %d, and logEndOffset: %d\n",
partition, unifiedLog.ProducerStateManager().ActiveProducers().Size(), leaderLogStartOffset, nextOffset)
return nextOffset, nil
}
// fetchEarlierEpochEndOffset fetches the end-offset for the epoch earlier to the given epoch from the leader.
func (tsm *TierStateMachine) fetchEarlierEpochEndOffset(epoch int, partition common.TopicPartition, currentLeaderEpoch int) (common.EpochEndOffset, error) {
previousEpoch := epoch - 1
partitionsWithEpochs := map[common.TopicPartition]common.OffsetForLeaderPartition{
partition: {
Partition: partition.Partition(),
CurrentLeaderEpoch: currentLeaderEpoch,
LeaderEpoch: previousEpoch,
},
}
response, err := tsm.leader.FetchEpochEndOffsets(partitionsWithEpochs)
if err != nil {
return common.EpochEndOffset{}, err
}
epochEndOffset, ok := response[partition]
if !ok {
return common.EpochEndOffset{}, errors.New("no response received for partition")
}
if epochEndOffset.ErrorCode() != common.ErrorsNone.Code() {
return common.EpochEndOffset{}, errors.New(epochEndOffset.ErrorMessage())
}
return epochEndOffset, nil
}
// readLeaderEpochCheckpoint reads the leader epoch checkpoint from remote storage.
func (tsm *TierStateMachine) readLeaderEpochCheckpoint(rlm storage.RemoteLogManager, remoteLogSegmentMetadata storage.RemoteLogSegmentMetadata) ([]storage.EpochEntry, error) {
inputStream, err := rlm.StorageManager().FetchIndex(remoteLogSegmentMetadata, storage.IndexTypeLeaderEpoch)
if err != nil {
return nil, err
}
defer inputStream.Close()
readBuffer := storage.NewCheckpointReadBuffer("", inputStream)
return readBuffer.Read()
}
// buildProducerSnapshotFile restores the producer snapshot file from remote storage.
func (tsm *TierStateMachine) buildProducerSnapshotFile(unifiedLog log.UnifiedLog, nextOffset int64, remoteLogSegmentMetadata storage.RemoteLogSegmentMetadata, rlm storage.RemoteLogManager) error {
snapshotFile := log.ProducerSnapshotFile(unifiedLog.Dir(), nextOffset)
tmpSnapshotFile := filepath.Join(snapshotFile, ".tmp")
inputStream, err := rlm.StorageManager().FetchIndex(remoteLogSegmentMetadata, storage.IndexTypeProducerSnapshot)
if err != nil {
return err
}
defer inputStream.Close()
tmpFile, err := os.Create(tmpSnapshotFile)
if err != nil {
return err
}
defer tmpFile.Close()
_, err = io.Copy(tmpFile, inputStream)
if err != nil {
return err
}
err = os.Rename(tmpSnapshotFile, snapshotFile)
if err != nil {
return err
}
unifiedLog.ProducerStateManager().TruncateFullyAndReloadSnapshots()
unifiedLog.LoadProducerState(nextOffset)
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment