Skip to content

Instantly share code, notes, and snippets.

@haru01
Last active July 21, 2025 22:08
Show Gist options
  • Select an option

  • Save haru01/f25aad7500b9dcbcab56e525a03e8351 to your computer and use it in GitHub Desktop.

Select an option

Save haru01/f25aad7500b9dcbcab56e525a03e8351 to your computer and use it in GitHub Desktop.

ドメインイベント導入ガイド - Effect-TS実装

要約

このドキュメントでは、ドメイン駆動設計におけるドメインイベントの導入と、Effect-TSを用いた実装方法について説明します。

Effect-TSを使うポイント

  • 型安全なエラーハンドリング: Effect.Effect<成功型, エラー型, 依存型>により、発生しうるエラーが型レベルで明示され、コンパイル時にエラー処理の漏れを防げます
  • 依存性注入: Context.Tagを使ったDIにより、テスト時にモックへの差し替えが容易

1. ドメイン層: イベント発行と網羅性チェックの追加

ドメインロジック(状態遷移)が、新しい状態だけでなくドメインイベントも生成するように変更します。また、switch文に網羅性チェックを追加します。

src/domain.ts

import { Schema, Effect, Context, Brand, Data } from "effect";

// --- データ構造 (スキーマ) ---
// 【Effect】Brand型: 実行時のバリデーションと型安全性を両立
type NonEmptyString = string & Brand.Brand<"NonEmptyString">;
const NonEmptyString = Schema.String.pipe(Schema.minLength(1), Schema.brand("NonEmptyString"));

// 【Effect】TaskId専用のBrand型: IDの型安全性を保証
type TaskId = string & Brand.Brand<"TaskId">;
const TaskId = Schema.String.pipe(
  Schema.minLength(1, { message: () => "Task ID cannot be empty" }),
  Schema.brand("TaskId")
);

// 【Effect】Schema.Class: 型安全なデータクラスの定義
// Tagged Union(ADT)により、状態を型レベルで表現
class Todo extends Schema.Class<Todo>()({ _tag: Schema.Literal("Todo"), createdAt: Schema.Date }) {}
class InProgress extends Schema.Class<InProgress>()({ _tag: Schema.Literal("InProgress"), startedAt: Schema.Date }) {}
class Done extends Schema.Class<Done>()({ _tag: Schema.Literal("Done"), completedAt: Schema.Date }) {}
export const TaskStatus = Schema.Union(Todo, InProgress, Done);
export type TaskStatus = Schema.To<typeof TaskStatus>;

// 【Effect】Data.Class: イミュータブルなデータ構造を簡潔に定義
export class Task extends Data.Class<{
  readonly id: TaskId;
  readonly title: NonEmptyString;
  readonly status: TaskStatus;
}> {}

// --- ドメインイベント ---
// 【Effect】Data.Class: イミュータブルなデータ構造を簡潔に定義
class TaskCompleted extends Data.Class<{
  readonly _tag: "TaskCompleted";
  readonly taskId: TaskId;  // TaskId型を使用
  readonly completedAt: Date;
}> {}
export type DomainEvent = TaskCompleted;

// --- ドメインエラー ---
// 【Effect】Data.TaggedError: 型安全なエラークラス、パターンマッチング可能
export class TaskAlreadyCompletedError extends Data.TaggedError("TaskAlreadyCompletedError") {}
export class TaskNotInProgressError extends Data.TaggedError("TaskNotInProgressError") {}
type DomainError = TaskAlreadyCompletedError | TaskNotInProgressError;

// --- ドメインロジック (状態とイベントを返す) ---
// 【Effect】Effect.Effect<成功型, エラー型>: 型レベルでエラーハンドリングを強制
export const completeTask = (
  task: Task
): Effect.Effect<readonly [Task, ReadonlyArray<DomainEvent>], DomainError> => {
  switch (task.status._tag) {
    case "Done":
      // 【Effect】Effect.fail: 型安全なエラー返却
      return Effect.fail(new TaskAlreadyCompletedError());
    case "Todo":
      return Effect.fail(new TaskNotInProgressError());
    case "InProgress": {
      const completedAt = new Date();
      // 【Effect】Data.Class: 新しい状態のTaskをイミュータブルに生成
      const updatedTask = new Task({ ...task, status: new Done({ completedAt }) });
      // 生成されたドメインイベント
      const event = new TaskCompleted({ taskId: task.id, completedAt });
      // 【Effect】Effect.succeed: 成功結果を型安全に返却
      return Effect.succeed([updatedTask, [event]]);
    // 網羅性チェック: 将来新しい状態が追加され、caseが漏れているとコンパイルエラーになる
    default: {
      const _exhaustiveCheck: never = task.status;
      // 【Effect】Effect.die: 回復不可能なエラー(プログラムのバグ)を表現
      return Effect.die(_exhaustiveCheck);
    }
  }
};

// --- インフラ層との契約 ---
export class TaskNotFoundError extends Data.TaggedError("TaskNotFoundError") {}
export class SaveError extends Data.TaggedError("SaveError")<{ cause: unknown }> {}
export class PublishError extends Data.TaggedError("PublishError")<{ cause: unknown }> {}

export interface TaskRepository {
  // 【Effect】各メソッドが発生しうるエラーを型で明示
  readonly findById: (id: TaskId) => Effect.Effect<Task, TaskNotFoundError>;
  readonly save: (task: Task) => Effect.Effect<void, SaveError>;
}
// 【Effect】Context.Tag: 依存性注入のためのタグ、テスト時にモック差し替えが容易
export const TaskRepository = Context.Tag<TaskRepository>();

export interface EventPublisher {
  readonly publish: (events: ReadonlyArray<DomainEvent>) => Effect.Effect<void, PublishError>;
}
export const EventPublisher = Context.Tag<EventPublisher>();

2. アプリケーション層: イベント発行の追加

ユースケースにイベント発行のステップを追加します。saveとpublishは並行して実行できます。

src/usecase.ts

import { Effect } from "effect";
import { 
  TaskRepository, 
  EventPublisher,
  completeTask,
  DomainError,
  TaskNotFoundError, 
  SaveError,
  PublishError
} from "./domain";

// 【Effect】Union型でアプリケーション層で扱うすべてのエラーを集約
type AppError = TaskNotFoundError | SaveError | PublishError | DomainError;

// 【Effect】Effect.Effect<成功型, エラー型, 依存型>で依存関係を型で明示
export const completeTaskUseCase = (taskId: string): Effect.Effect<Task, AppError, TaskRepository | EventPublisher> =>
  // 【Effect】Effect.gen: do記法風の構文糖衣、非同期処理を同期的に記述
  Effect.gen(function*() {
    // 【Effect】依存性の注入、Context.Tagで定義されたサービスを取得
    const repo = yield* TaskRepository;
    const publisher = yield* EventPublisher;

    // 【Effect】yield*でEffect値を展開、エラーハンドリングは自動的に上位に伝播
    const task = yield* repo.findById(taskId);
    // ドメインロジックは新しい状態とイベントを返す
    const [updatedTask, events] = yield* completeTask(task);

    // 【Effect】Effect.all: 複数のEffectを並行実行、どちらかが失敗すれば全体が失敗
    // concurrency: "inherit"で親のコンカレンシー設定を継承
    yield* Effect.all([
      repo.save(updatedTask),
      publisher.publish(events)
    ], { concurrency: "inherit" });

    console.log(`[App] タスク「${updatedTask.title}」を正常に完了し、イベントを発行しました。`);
    return updatedTask;
  });

3. プロダクトコード: 本番用の実装と実行

EventPublisherの本番用Layerを追加します。

インフラ層(本番用)

src/infra.ts

import { Layer, Effect } from "effect";
import { 
  Task, TaskRepository, TaskNotFoundError, SaveError,
  EventPublisher, PublishError, DomainEvent,
  InProgress
} from "./domain";

// -- Repository --
const db = new Map<TaskId, Task>([
  ["task-1" as TaskId, new Task({ 
    id: "task-1" as TaskId, 
    title: "最終課題" as NonEmptyString, 
    status: new InProgress({ startedAt: new Date() }) 
  })],
]);

// 【Effect】Layer.succeed: 依存性の具体的な実装を提供するLayer
// テスト時には異なるLayerに差し替え可能
export const LiveTaskRepository = Layer.succeed(TaskRepository, {
  findById: (id) => db.has(id) ? Effect.succeed(db.get(id)!) : Effect.fail(new TaskNotFoundError()),
  // 【Effect】Effect.sync: 同期的な副作用をEffectでラップ
  save: (task) => Effect.sync(() => { db.set(task.id, task); }),
});

// -- Event Publisher --
export const LiveEventPublisher = Layer.succeed(EventPublisher, {
  publish: (events) => Effect.sync(() => {
    console.log(`[Event] ${events.length}件のイベントを発行:`, events.map(e => e._tag));
  }),
});

エントリーポイント

src/main.ts

import { Effect, Layer } from "effect";
import { completeTaskUseCase } from "./usecase";
import { LiveTaskRepository, LiveEventPublisher } from "./infra";

const program = completeTaskUseCase("task-1" as TaskId);

// 【Effect】Layer.merge: 複数のLayerを結合して依存関係を解決
const AppLayer = Layer.merge(LiveTaskRepository, LiveEventPublisher);

// 【Effect】Effect.provide: プログラムに依存関係を注入して実行可能な状態にする
// Effect.runPromise: EffectをPromiseに変換して実行
Effect.runPromise(Effect.provide(program, AppLayer));

4. テストコード

テストでもEventPublisherのモックを提供し、イベントが正しく発行されたかを検証します。

tests/usecase.test.ts

import { describe, it, expect, vi, assert } from "vitest";
import { Effect, Exit, Layer } from "effect";
import { Task, TaskRepository, EventPublisher, InProgress } from "../src/domain";
import { completeTaskUseCase } from "../src/usecase";

// --- テスト用のモック実装 ---
const mockRepo = { findById: vi.fn(), save: vi.fn() };
// 【Effect】Layer.succeed: テスト用のモック実装をLayerとして定義
const TestTaskRepository = Layer.succeed(TaskRepository, mockRepo);

const mockPublisher = { publish: vi.fn() };
const TestEventPublisher = Layer.succeed(EventPublisher, mockPublisher);

// 【Effect】Layer.merge: テスト用のLayerを結合
const TestLayer = Layer.merge(TestTaskRepository, TestEventPublisher);

describe("completeTaskUseCase", () => {
  it("正常系: タスクを完了し、ドメインイベントを発行する", async () => {
    // Arrange
    const task = new Task({ 
      id: "task-1" as TaskId, 
      title: "テスト対象" as NonEmptyString, 
      status: new InProgress({ startedAt: new Date() }) 
    });
    // 【Effect】Effect.succeed/void: モックの戻り値をEffectで包む
    mockRepo.findById.mockReturnValue(Effect.succeed(task));
    mockRepo.save.mockReturnValue(Effect.void);
    mockPublisher.publish.mockReturnValue(Effect.void);

    // Act
    const program = completeTaskUseCase("task-1" as TaskId);
    // 【Effect】Effect.runPromiseExit: Effectを実行し、Exit(成功/失敗)を取得
    const result = await Effect.runPromiseExit(Effect.provide(program, TestLayer));

    // Assert
    // 【Effect】Exit.match: 成功/失敗をパターンマッチングで処理
    Exit.match(result, {
      onSuccess: () => {
        // 1. 保存が呼ばれたか
        expect(mockRepo.save).toHaveBeenCalledTimes(1);
        
        // 2. イベント発行が呼ばれたか
        expect(mockPublisher.publish).toHaveBeenCalledTimes(1);
        const publishedEvents = mockPublisher.publish.mock.calls[0][0];
        
        // 3. 発行されたイベントの内容は正しいか
        expect(publishedEvents).toHaveLength(1);
        expect(publishedEvents[0]._tag).toBe("TaskCompleted");
        expect(publishedEvents[0].taskId).toBe("task-1" as TaskId);
      },
      onFailure: (cause) => assert.fail(`テストが失敗しました: ${cause.pretty}`),
    });
  });
});

まとめ

この実装により、以下が実現されます:

  • ドメインの豊かな表現: 状態変更だけでなく、何が起こったかをイベントとして表現
  • 疎結合なアーキテクチャ: イベントを通じた非同期処理や他のコンテキストとの連携
  • テスト容易性: モックを使った単体テストでイベント発行の検証が可能
  • 型安全性: Effect-TSとTypeScriptの組み合わせによる堅牢な実装
@haru01
Copy link
Author

haru01 commented Jul 21, 2025

ドメインイベント導入ガイド - Effect-TS実装

要約

このドキュメントでは、ドメイン駆動設計におけるドメインイベントの導入と、Effect-TSを用いた実装方法について説明します。

Effect-TSを使うポイント

  • 型安全なエラーハンドリング: Effect.Effect<成功型, エラー型, 依存型>により、発生しうるエラーが型レベルで明示され、コンパイル時にエラー処理の漏れを防げます
  • 依存性注入: Context.Tagを使ったDIにより、テスト時にモックへの差し替えが容易

1. ドメイン層: イベント発行と網羅性チェックの追加

ドメインロジック(状態遷移)が、新しい状態だけでなくドメインイベントも生成するように変更します。また、switch文に網羅性チェックを追加します。

src/domain.ts

import { Schema, Effect, Context, Brand, Data } from "effect";

// --- データ構造 (スキーマ) ---
// 【Effect】Brand型: 実行時のバリデーションと型安全性を両立
type NonEmptyString = string & Brand.Brand<"NonEmptyString">;
const NonEmptyString = Schema.String.pipe(Schema.minLength(1), Schema.brand("NonEmptyString"));

// 【Effect】TaskId専用のBrand型: IDの型安全性を保証
type TaskId = string & Brand.Brand<"TaskId">;
const TaskId = Schema.String.pipe(
  Schema.minLength(1, { message: () => "Task ID cannot be empty" }),
  Schema.brand("TaskId")
);

// 【Effect】Schema.Class: 型安全なデータクラスの定義
// Tagged Union(ADT)により、状態を型レベルで表現
class Todo extends Schema.Class<Todo>()({ _tag: Schema.Literal("Todo"), createdAt: Schema.Date }) {}
class InProgress extends Schema.Class<InProgress>()({ _tag: Schema.Literal("InProgress"), startedAt: Schema.Date }) {}
class Done extends Schema.Class<Done>()({ _tag: Schema.Literal("Done"), completedAt: Schema.Date }) {}
export const TaskStatus = Schema.Union(Todo, InProgress, Done);
export type TaskStatus = Schema.To<typeof TaskStatus>;

// 【Effect】Data.Class: イミュータブルなデータ構造を簡潔に定義
export class Task extends Data.Class<{
  readonly id: TaskId;
  readonly title: NonEmptyString;
  readonly status: TaskStatus;
}> {}

// --- ドメインイベント ---
// 【Effect】Data.Class: イミュータブルなデータ構造を簡潔に定義
class TaskCompleted extends Data.Class<{
  readonly _tag: "TaskCompleted";
  readonly taskId: TaskId;  // TaskId型を使用
  readonly completedAt: Date;
}> {}
export type DomainEvent = TaskCompleted;

// --- ドメインエラー ---
// 【Effect】Data.TaggedError: 型安全なエラークラス、パターンマッチング可能
export class TaskAlreadyCompletedError extends Data.TaggedError("TaskAlreadyCompletedError") {}
export class TaskNotInProgressError extends Data.TaggedError("TaskNotInProgressError") {}
type DomainError = TaskAlreadyCompletedError | TaskNotInProgressError;

// --- ドメインロジック (状態とイベントを返す) ---
// 【Effect】Effect.Effect<成功型, エラー型>: 型レベルでエラーハンドリングを強制
export const completeTask = (
  task: Task
): Effect.Effect<readonly [Task, ReadonlyArray<DomainEvent>], DomainError> => {
  switch (task.status._tag) {
    case "Done":
      // 【Effect】Effect.fail: 型安全なエラー返却
      return Effect.fail(new TaskAlreadyCompletedError());
    case "Todo":
      return Effect.fail(new TaskNotInProgressError());
    case "InProgress": {
      const completedAt = new Date();
      // 【Effect】Data.Class: 新しい状態のTaskをイミュータブルに生成
      const updatedTask = new Task({ ...task, status: new Done({ completedAt }) });
      // 生成されたドメインイベント
      const event = new TaskCompleted({ taskId: task.id, completedAt });
      // 【Effect】Effect.succeed: 成功結果を型安全に返却
      return Effect.succeed([updatedTask, [event]]);
    }
    // 網羅性チェック: 将来新しい状態が追加され、caseが漏れているとコンパイルエラーになる
    default: {
      const _exhaustiveCheck: never = task.status;
      // 【Effect】Effect.die: 回復不可能なエラー(プログラムのバグ)を表現
      return Effect.die(_exhaustiveCheck);
    }
  }
};

// --- インフラ層との契約 ---
export class TaskNotFoundError extends Data.TaggedError("TaskNotFoundError") {}
export class SaveError extends Data.TaggedError("SaveError")<{ cause: unknown }> {}
export class PublishError extends Data.TaggedError("PublishError")<{ cause: unknown }> {}

export interface TaskRepository {
  // 【Effect】各メソッドが発生しうるエラーを型で明示
  readonly findById: (id: TaskId) => Effect.Effect<Task, TaskNotFoundError>;
  readonly save: (task: Task) => Effect.Effect<void, SaveError>;
}
// 【Effect】Context.Tag: 依存性注入のためのタグ、テスト時にモック差し替えが容易
export const TaskRepository = Context.Tag<TaskRepository>();

export interface EventPublisher {
  readonly publish: (events: ReadonlyArray<DomainEvent>) => Effect.Effect<void, PublishError>;
}
export const EventPublisher = Context.Tag<EventPublisher>();

リトライ戦略の選択肢と実装場所

1. アプリケーション層でのリトライ(推奨)

ユースケース内でイベント発行のみにリトライを適用する方法:

アプリケーション層(ユースケース内)のリトライ実装 ⭐

基本的なリトライパターン

import { Effect, Schedule } from "effect";
import { 
  TaskRepository, 
  EventPublisher,
  completeTask,
  DomainError,
  TaskNotFoundError, 
  SaveError,
  PublishError
} from "./domain";

type AppError = TaskNotFoundError | SaveError | PublishError | DomainError | ValidationError;

// パターン1: シンプルなリトライ
export const completeTaskUseCase = (taskId: TaskId): Effect.Effect<Task, AppError, TaskRepository | EventPublisher> =>
  Effect.gen(function*() {
    const repo = yield* TaskRepository;
    const publisher = yield* EventPublisher;

    const task = yield* repo.findById(taskId);
    const [updatedTask, events] = yield* completeTask(task);

    // タスク保存は確実に実行(リトライなし)
    yield* repo.save(updatedTask);
    
    // 【リトライ戦略1】基本的な指数バックオフリトライ
    yield* Effect.retry(
      publisher.publish(events),
      {
        times: 3, // 最大3回リトライ
        schedule: Schedule.exponential("100 millis") // 100ms, 200ms, 400ms...
      }
    );

    console.log(`[App] タスク「${updatedTask.title}」を正常に完了し、イベントを発行しました。`);
    return updatedTask;
  });

条件付きリトライパターン

// パターン2: 特定のエラーのみリトライ
export const completeTaskUseCaseWithConditionalRetry = (taskId: TaskId): Effect.Effect<Task, AppError, TaskRepository | EventPublisher> =>
  Effect.gen(function*() {
    const repo = yield* TaskRepository;
    const publisher = yield* EventPublisher;

    const task = yield* repo.findById(taskId);
    const [updatedTask, events] = yield* completeTask(task);

    yield* repo.save(updatedTask);
    
    // 【リトライ戦略2】条件付きリトライ - 一時的なエラーのみ
    yield* Effect.retry(
      publisher.publish(events),
      Schedule.exponential("100 millis")
        .pipe(Schedule.intersect(Schedule.recurs(3))) // 最大3回
        .pipe(Schedule.whileInput((error: PublishError) => {
          // ネットワークエラーや一時的な問題のみリトライ
          const cause = error.cause;
          return (
            cause instanceof NetworkError ||
            cause instanceof TemporaryServiceUnavailableError ||
            (cause instanceof Error && cause.message.includes("timeout"))
          );
        }))
    );

    return updatedTask;
  });

段階的リトライパターン

// パターン3: 段階的リトライ戦略
export const completeTaskUseCaseWithStagedRetry = (taskId: TaskId): Effect.Effect<Task, AppError, TaskRepository | EventPublisher> =>
  Effect.gen(function*() {
    const repo = yield* TaskRepository;
    const publisher = yield* EventPublisher;

    const task = yield* repo.findById(taskId);
    const [updatedTask, events] = yield* completeTask(task);

    yield* repo.save(updatedTask);
    
    // 【リトライ戦略3】段階的リトライ
    const eventPublishWithRetry = publisher.publish(events).pipe(
      // 第1段階: 短い間隔で3回リトライ(一時的な問題対応)
      Effect.retry({
        times: 3,
        schedule: Schedule.exponential("50 millis") // 50ms, 100ms, 200ms
      }),
      // 第2段階: 長い間隔で2回リトライ(サービス復旧待ち)
      Effect.catchAll((error) => {
        if (error._tag === "PublishError") {
          console.warn(`[App] イベント発行の第1段階リトライ失敗、第2段階を開始: ${error.cause}`);
          return Effect.retry(
            publisher.publish(events),
            {
              times: 2,
              schedule: Schedule.exponential("2 seconds") // 2s, 4s
            }
          );
        }
        return Effect.fail(error);
      })
    );

    yield* eventPublishWithRetry;
    return updatedTask;
  });

イベント種別による異なるリトライ戦略

// パターン4: イベント種別による戦略変更
export const completeTaskUseCaseWithEventTypeRetry = (taskId: TaskId): Effect.Effect<Task, AppError, TaskRepository | EventPublisher> =>
  Effect.gen(function*() {
    const repo = yield* TaskRepository;
    const publisher = yield* EventPublisher;

    const task = yield* repo.findById(taskId);
    const [updatedTask, events] = yield* completeTask(task);

    yield* repo.save(updatedTask);
    
    // 【リトライ戦略4】イベント種別による戦略分岐
    const publishEventsByType = Effect.gen(function*() {
      for (const event of events) {
        const retryStrategy = getRetryStrategyForEvent(event);
        yield* Effect.retry(
          publisher.publish([event]), // 1つずつ発行
          retryStrategy
        );
      }
    });

    yield* publishEventsByType;
    return updatedTask;
  });

// イベント種別によるリトライ戦略の決定
const getRetryStrategyForEvent = (event: DomainEvent) => {
  switch (event._tag) {
    case "TaskCompleted":
      // 重要なイベントなので積極的にリトライ
      return {
        times: 5,
        schedule: Schedule.exponential("200 millis")
      };
    case "TaskUpdated":
      // 軽微なイベントなので軽くリトライ
      return {
        times: 2,
        schedule: Schedule.exponential("100 millis")
      };
    default:
      return {
        times: 3,
        schedule: Schedule.exponential("150 millis")
      };
  }
};

ログ付きリトライパターン

// パターン5: 詳細なログ付きリトライ
export const completeTaskUseCaseWithLogging = (taskId: TaskId): Effect.Effect<Task, AppError, TaskRepository | EventPublisher> =>
  Effect.gen(function*() {
    const repo = yield* TaskRepository;
    const publisher = yield* EventPublisher;

    const task = yield* repo.findById(taskId);
    const [updatedTask, events] = yield* completeTask(task);

    yield* repo.save(updatedTask);
    
    // 【リトライ戦略5】ログ付きリトライ
    const eventPublishWithLogging = publisher.publish(events).pipe(
      Effect.retry({
        times: 3,
        schedule: Schedule.exponential("100 millis")
          .pipe(Schedule.tapInput((retryCount, error: PublishError) => 
            Effect.sync(() => {
              console.warn(`[App] イベント発行リトライ ${retryCount + 1}/3: ${error.cause}`);
            })
          ))
      }),
      Effect.tapError((error) => 
        Effect.sync(() => {
          console.error(`[App] イベント発行が最終的に失敗しました: ${error.cause}`);
          // メトリクス送信、アラート等
        })
      ),
      Effect.tap(() =>
        Effect.sync(() => {
          console.info(`[App] イベント発行成功: ${events.length}件`);
        })
      )
    );

    yield* eventPublishWithLogging;
    return updatedTask;
  });

柔軟な設定可能リトライパターン

// パターン6: 設定可能なリトライ戦略
interface RetryConfig {
  readonly maxRetries: number;
  readonly initialDelay: string;
  readonly maxDelay: string;
  readonly retryableErrors: readonly string[];
}

const defaultRetryConfig: RetryConfig = {
  maxRetries: 3,
  initialDelay: "100 millis",
  maxDelay: "5 seconds",
  retryableErrors: ["NetworkError", "TimeoutError", "ServiceUnavailableError"]
};

export const completeTaskUseCaseConfigurable = (
  taskId: TaskId,
  retryConfig: RetryConfig = defaultRetryConfig
): Effect.Effect<Task, AppError, TaskRepository | EventPublisher> =>
  Effect.gen(function*() {
    const repo = yield* TaskRepository;
    const publisher = yield* EventPublisher;

    const task = yield* repo.findById(taskId);
    const [updatedTask, events] = yield* completeTask(task);

    yield* repo.save(updatedTask);
    
    // 【リトライ戦略6】設定可能なリトライ
    const retrySchedule = Schedule.exponential(retryConfig.initialDelay)
      .pipe(Schedule.intersect(Schedule.recurs(retryConfig.maxRetries)))
      .pipe(Schedule.whileInput((error: PublishError) => {
        const errorName = error.cause?.constructor.name || "UnknownError";
        return retryConfig.retryableErrors.includes(errorName);
      }))
      .pipe(Schedule.upTo(retryConfig.maxDelay)); // 最大遅延を制限

    yield* Effect.retry(publisher.publish(events), retrySchedule);
    return updatedTask;
  });

テスト用のリトライ戦略

// テスト環境用: 高速リトライ
export const completeTaskUseCaseForTest = (taskId: TaskId): Effect.Effect<Task, AppError, TaskRepository | EventPublisher> =>
  Effect.gen(function*() {
    const repo = yield* TaskRepository;
    const publisher = yield* EventPublisher;

    const task = yield* repo.findById(taskId);
    const [updatedTask, events] = yield* completeTask(task);

    yield* repo.save(updatedTask);
    
    // テスト用: 高速リトライ(テスト実行時間短縮)
    yield* Effect.retry(
      publisher.publish(events),
      {
        times: 2, // 少ない回数
        schedule: Schedule.exponential("1 millis") // 高速
      }
    );

    return updatedTask;
  });

これらのパターンから、要件に応じて適切なリトライ戦略を選択できます。アプリケーション層でリトライを実装することで、ビジネスロジックの一部として明示的にリトライ戦略を表現でき、テストも容易になります。

src/usecase.ts

import { Effect, Schedule } from "effect";
import { 
  TaskRepository, 
  EventPublisher,
  completeTask,
  DomainError,
  TaskNotFoundError, 
  SaveError,
  PublishError
} from "./domain";

// ユースケースでバリデーションエラーを含める
type AppError = TaskNotFoundError | SaveError | PublishError | DomainError | ValidationError;

export const completeTaskUseCase = (taskId: TaskId): Effect.Effect<Task, AppError, TaskRepository | EventPublisher> =>
  Effect.gen(function*() {
    const repo = yield* TaskRepository;
    const publisher = yield* EventPublisher;

    const task = yield* repo.findById(taskId);
    const [updatedTask, events] = yield* completeTask(task);

    // タスク保存は確実に実行(リトライなし)
    yield* repo.save(updatedTask);
    
    // 【Effect】イベント発行のみリトライ - ビジネス的に重要だが失敗しても許容可能
    yield* Effect.retry(
      publisher.publish(events),
      {
        times: 3, // 最大3回リトライ
        schedule: Schedule.exponential("100 millis") // 指数バックオフ
      }
    );

    console.log(`[App] タスク「${updatedTask.title}」を正常に完了し、イベントを発行しました。`);
    return updatedTask;
  });

// より詳細なリトライ戦略の例
export const completeTaskUseCaseWithAdvancedRetry = (taskId: TaskId): Effect.Effect<Task, AppError, TaskRepository | EventPublisher> =>
  Effect.gen(function*() {
    const repo = yield* TaskRepository;
    const publisher = yield* EventPublisher;

    const task = yield* repo.findById(taskId);
    const [updatedTask, events] = yield* completeTask(task);

    yield* repo.save(updatedTask);
    
    // 特定のエラーのみリトライし、それ以外は即座に失敗
    yield* Effect.retry(
      publisher.publish(events),
      Schedule.exponential("100 millis")
        .pipe(Schedule.intersect(Schedule.recurs(3))) // 最大3回
        .pipe(Schedule.whileInput((error: PublishError) => {
          // ネットワークエラーなど一時的な問題のみリトライ
          return error.cause instanceof NetworkError;
        }))
    );

    return updatedTask;
  });

2. インフラ層でのリトライ

EventPublisherの実装レベルでリトライを組み込む方法:

src/infra.ts

// リトライ機能付きEventPublisher
export const LiveEventPublisherWithRetry = Layer.succeed(EventPublisher, {
  publish: (events) => Effect.retry(
    Effect.tryPromise({
      try: async () => {
        // 実際のメッセージキューへの送信
        await sendToMessageQueue(events);
        console.log(`[Event] ${events.length}件のイベントを発行:`, events.map(e => e._tag));
      },
      catch: (error) => new PublishError({ cause: error })
    }),
    {
      times: 3,
      schedule: Schedule.exponential("200 millis")
    }
  )
});

3. 呼び出し元でのエラーハンドリング

ユースケースの呼び出し側でリトライや代替処理を行う場合:

src/main.ts

import { Effect, Either, Schedule, Layer } from "effect";
import { completeTaskUseCase } from "./usecase";
import { LiveTaskRepository, LiveEventPublisher } from "./infra";

// パターン1: ユースケース全体をリトライ
const programWithFullRetry = completeTaskUseCase("task-1" as TaskId).pipe(
  Effect.retry({
    times: 5,
    schedule: Schedule.exponential("1 second")
  })
);

// パターン2: エラー種別による詳細制御
const programWithErrorHandling = completeTaskUseCase("task-1" as TaskId).pipe(
  Effect.catchAll((error) => {
    switch (error._tag) {
      case "PublishError":
        console.error("イベント発行に失敗しました。後で再試行します:", error);
        // 非同期でリトライキューに追加するなど
        return scheduleEventRetry(error);
      case "TaskNotFoundError":
        console.error("タスクが見つかりません:", error);
        return Effect.fail(error);
      default:
        return Effect.fail(error);
    }
  })
);

// パターン3: Either を使った詳細なエラーハンドリング
const programWithEither = Effect.gen(function*() {
  const result = yield* Effect.either(
    completeTaskUseCase("task-1" as TaskId)
  );
  
  return Either.match(result, {
    onLeft: (error) => {
      if (error._tag === "PublishError") {
        console.error("イベント発行失敗 - 代替処理を実行");
        // イベントを別の場所に保存、後で再処理
        return saveEventForRetry(error);
      }
      throw error; // その他のエラーは再スロー
    },
    onRight: (task) => {
      console.log("タスク完了:", task);
      return task;
    }
  });
});

// 非同期でのイベント再試行キューイング
const scheduleEventRetry = (error: PublishError): Effect.Effect<string> =>
  Effect.sync(() => {
    // Redis、データベース、またはメッセージキューに保存
    console.log("イベントを再試行キューに追加しました");
    return "イベント発行は後で再試行されます";
  });

const AppLayer = Layer.merge(LiveTaskRepository, LiveEventPublisher);
Effect.runPromise(Effect.provide(programWithErrorHandling, AppLayer));

推奨アプローチの選択指針

1. アプリケーション層(ユースケース内)- 最推奨 ⭐

メリット:

  • ビジネスロジックとしてリトライ戦略が明示的
  • ユースケースごとに異なるリトライ戦略を適用可能
  • テストしやすい(モックでリトライ動作を検証)
  • タスク保存とイベント発行を分離して制御可能

適用ケース:

  • イベント発行がビジネス的に重要だが、失敗してもタスク完了は成功扱いにしたい場合
  • 特定のユースケースのみ特別なリトライ戦略が必要な場合

2. 呼び出し元でのエラーハンドリング

メリット:

  • 横断的関心事として統一的なエラー処理
  • 複数のユースケースに共通のリトライ戦略を適用
  • ビジネスロジックをクリーンに保てる

適用ケース:

  • 全体的なエラーハンドリング戦略が必要
  • ユースケース失敗時の代替フローが複雑な場合
  • 監視・ロギング・アラートを統一したい場合

3. インフラ層

メリット:

  • 技術的な問題(ネットワーク等)に対する透明なリトライ
  • アプリケーション層が技術的詳細を意識しなくて済む

適用ケース:

  • 低レベルの技術的問題(ネットワーク、一時的なサービス停止)のみ
  • すべてのイベント発行に共通のリトライが必要

実装の組み合わせ例

// 多層防御的なアプローチ
const robustCompleteTask = completeTaskUseCase("task-1" as TaskId).pipe(
  // 1. ユースケース内でイベント発行リトライ(3回)
  // 2. それでも失敗した場合、呼び出し元で全体リトライ(2回)
  Effect.retry({ times: 2, schedule: Schedule.exponential("5 seconds") }),
  // 3. 最終的に失敗した場合の代替処理
  Effect.catchAll((error) => {
    if (error._tag === "PublishError") {
      return scheduleEventRetry(error);
    }
    return Effect.fail(error);
  })
);

この設計により、堅牢で柔軟なイベント処理システムが構築できます。

2. アプリケーション層: イベント発行の追加とリトライ戦略

ユースケースにイベント発行のステップを追加します。saveとpublishは並行して実行できます。

src/usecase.ts

import { Effect, Schedule } from "effect";
import { 
  TaskRepository, 
  EventPublisher,
  completeTask,
  DomainError,
  TaskNotFoundError, 
  SaveError,
  PublishError
} from "./domain";

// 【Effect】Union型でアプリケーション層で扱うすべてのエラーを集約
type AppError = TaskNotFoundError | SaveError | PublishError | DomainError;

// 【Effect】Effect.Effect<成功型, エラー型, 依存型>で依存関係を型で明示
export const completeTaskUseCase = (taskId: string): Effect.Effect<Task, AppError, TaskRepository | EventPublisher> =>
  // 【Effect】Effect.gen: do記法風の構文糖衣、非同期処理を同期的に記述
  Effect.gen(function*() {
    // 【Effect】依存性の注入、Context.Tagで定義されたサービスを取得
    const repo = yield* TaskRepository;
    const publisher = yield* EventPublisher;

    // 【Effect】yield*でEffect値を展開、エラーハンドリングは自動的に上位に伝播
    const task = yield* repo.findById(taskId);
    // ドメインロジックは新しい状態とイベントを返す
    const [updatedTask, events] = yield* completeTask(task);

    // 【Effect】Effect.all: 複数のEffectを並行実行、どちらかが失敗すれば全体が失敗
    // concurrency: "inherit"で親のコンカレンシー設定を継承
    yield* Effect.all([
      repo.save(updatedTask),
      publisher.publish(events)
    ], { concurrency: "inherit" });

    console.log(`[App] タスク「${updatedTask.title}」を正常に完了し、イベントを発行しました。`);
    return updatedTask;
  });

3. プロダクトコード: 本番用の実装と実行

EventPublisherの本番用Layerを追加します。

インフラ層(本番用)

src/infra.ts

import { Layer, Effect } from "effect";
import { 
  Task, TaskRepository, TaskNotFoundError, SaveError,
  EventPublisher, PublishError, DomainEvent,
  InProgress
} from "./domain";

// -- Repository --
const db = new Map<TaskId, Task>([
  ["task-1" as TaskId, new Task({ 
    id: "task-1" as TaskId, 
    title: "最終課題" as NonEmptyString, 
    status: new InProgress({ startedAt: new Date() }) 
  })],
]);

// 【Effect】Layer.succeed: 依存性の具体的な実装を提供するLayer
// テスト時には異なるLayerに差し替え可能
export const LiveTaskRepository = Layer.succeed(TaskRepository, {
  findById: (id) => db.has(id) ? Effect.succeed(db.get(id)!) : Effect.fail(new TaskNotFoundError()),
  // 【Effect】Effect.sync: 同期的な副作用をEffectでラップ
  save: (task) => Effect.sync(() => { db.set(task.id, task); }),
});

// -- Event Publisher --
export const LiveEventPublisher = Layer.succeed(EventPublisher, {
  publish: (events) => Effect.sync(() => {
    console.log(`[Event] ${events.length}件のイベントを発行:`, events.map(e => e._tag));
  }),
});

エントリーポイント

src/main.ts

import { Effect, Layer, Schedule } from "effect";
import { completeTaskUseCase } from "./usecase";
import { LiveTaskRepository, LiveEventPublisher } from "./infra";

// パターン1: ユースケース全体をリトライ
const programWithFullRetry = completeTaskUseCase("task-1" as TaskId).pipe(
  Effect.retry({ times: 5, schedule: Schedule.exponential("1 second") })
);

// パターン2: エラー種別による詳細制御
const programWithErrorHandling = completeTaskUseCase("task-1" as TaskId).pipe(
  Effect.catchAll((error) => {
    if (error._tag === "PublishError") {
      console.error("イベント発行に失敗しました。後で再試行します:", error);
      // 非同期でリトライキューに追加
      return Effect.succeed("イベント発行は後で再試行されます");
    }
    return Effect.fail(error); // その他のエラーは再スロー
  })
);

// 複数のLayerを結合して提供
const AppLayer = Layer.merge(LiveTaskRepository, LiveEventPublisher);
Effect.runPromise(Effect.provide(programWithErrorHandling, AppLayer));

4. テストコード

テストでもEventPublisherのモックを提供し、イベントが正しく発行されたかを検証します。

tests/usecase.test.ts

import { describe, it, expect, vi, assert } from "vitest";
import { Effect, Exit, Layer } from "effect";
import { Task, TaskRepository, EventPublisher, InProgress } from "../src/domain";
import { completeTaskUseCase } from "../src/usecase";

// --- テスト用のモック実装 ---
const mockRepo = { findById: vi.fn(), save: vi.fn() };
// 【Effect】Layer.succeed: テスト用のモック実装をLayerとして定義
const TestTaskRepository = Layer.succeed(TaskRepository, mockRepo);

const mockPublisher = { publish: vi.fn() };
const TestEventPublisher = Layer.succeed(EventPublisher, mockPublisher);

// 【Effect】Layer.merge: テスト用のLayerを結合
const TestLayer = Layer.merge(TestTaskRepository, TestEventPublisher);

describe("completeTaskUseCase", () => {
  it("正常系: タスクを完了し、ドメインイベントを発行する", async () => {
    // Arrange
    const task = new Task({ 
      id: "task-1" as TaskId, 
      title: "テスト対象" as NonEmptyString, 
      status: new InProgress({ startedAt: new Date() }) 
    });
    // 【Effect】Effect.succeed/void: モックの戻り値をEffectで包む
    mockRepo.findById.mockReturnValue(Effect.succeed(task));
    mockRepo.save.mockReturnValue(Effect.void);
    mockPublisher.publish.mockReturnValue(Effect.void);

    // Act
    const program = completeTaskUseCase("task-1" as TaskId);
    // 【Effect】Effect.runPromiseExit: Effectを実行し、Exit(成功/失敗)を取得
    const result = await Effect.runPromiseExit(Effect.provide(program, TestLayer));

    // Assert
    // 【Effect】Exit.match: 成功/失敗をパターンマッチングで処理
    Exit.match(result, {
      onSuccess: () => {
        // 1. 保存が呼ばれたか
        expect(mockRepo.save).toHaveBeenCalledTimes(1);
        
        // 2. イベント発行が呼ばれたか
        expect(mockPublisher.publish).toHaveBeenCalledTimes(1);
        const publishedEvents = mockPublisher.publish.mock.calls[0][0];
        
        // 3. 発行されたイベントの内容は正しいか
        expect(publishedEvents).toHaveLength(1);
        expect(publishedEvents[0]._tag).toBe("TaskCompleted");
        expect(publishedEvents[0].taskId).toBe("task-1" as TaskId);
      },
      onFailure: (cause) => assert.fail(`テストが失敗しました: ${cause.pretty}`),
    });
  });
});

まとめ

この実装により、以下が実現されます:

  • ドメインの豊かな表現: 状態変更だけでなく、何が起こったかをイベントとして表現
  • 疎結合なアーキテクチャ: イベントを通じた非同期処理や他のコンテキストとの連携
  • テスト容易性: モックを使った単体テストでイベント発行の検証が可能
  • 型安全性: Effect-TSとTypeScriptの組み合わせによる堅牢な実装

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment