Skip to content

Instantly share code, notes, and snippets.

@MurageKibicho
Created March 22, 2025 03:48
Show Gist options
  • Save MurageKibicho/e4ae67c391cb3cf2829e33cdb3205f9e to your computer and use it in GitHub Desktop.
Save MurageKibicho/e4ae67c391cb3cf2829e33cdb3205f9e to your computer and use it in GitHub Desktop.
Forward step Diffusion
#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include <assert.h>
#define PI 3.14159265358979323846
#define MIN_BETA 0.0001
#define MAX_BETA 0.9999
float DiffusionInternalSigmoid(float x)
{
return 1 / (1 + exp(-x));
}
// Function to generate linearly spaced values between start and end
float *linspace(float start, float end, int arrayLength)
{
assert(arrayLength > 0);
float *result = (float*)malloc(arrayLength * sizeof(float));
assert(result != NULL);
if(arrayLength == 1){result[0] = start;return result;}
float step = (end - start) / (arrayLength - 1);
for(int i = 0; i < arrayLength; i++)
{
result[i] = start + i * step;
}
return result;
}
float *LinearBetaSchedule(int timeSteps)
{
float start = 0.0001;
float end = 0.02;
return linspace(start, end, timeSteps);
}
float *QuadraticBetaSchedule(int timeSteps)
{
float start = 0.0001;
float end = 0.02;
start = sqrt(start);
end = sqrt(end);
float *betas = linspace(start, end, timeSteps);
for(int i = 0; i < timeSteps; i++)
{
betas[i] = betas[i] * betas[i];
}
return betas;
}
float *CosineBetaSchedule(int timeSteps)
{
float s = 0.008;
int steps = timeSteps + 1;
float *temporary = linspace(0, timeSteps, steps);
float *betas = calloc(timeSteps, sizeof(float));
// Step 2: Compute alphas_cumprod using the cosine schedule formula
float *alphasCumProd = calloc(steps, sizeof(float));
float factor = PI * 0.5 / (1 + s);
for(int i = 0; i < steps; i++)
{
float term = ((temporary[i] / timeSteps) + s) * factor;
alphasCumProd[i] = pow(cos(term), 2);
}
// Step 3: Normalize alphas_cumprod by dividing by alphas_cumprod[0]
float firstAlpha = alphasCumProd[0];
for(int i = 0; i < steps; i++)
{
alphasCumProd[i] /= firstAlpha;
}
// Step 4: Compute betas
for(int i = 1; i < steps; i++)
{
float beta = 1 - (alphasCumProd[i] / alphasCumProd[i - 1]);
// Clip beta to the range [MIN_BETA, MAX_BETA]
if(beta < MIN_BETA) beta = MIN_BETA;
if(beta > MAX_BETA) beta = MAX_BETA;
betas[i - 1] = beta;
}
free(temporary);free(alphasCumProd);
return betas;
}
float *SigmoidBetaSchedule(int timeSteps)
{
float start = 0.0001;
float end = 0.02;
float *betas = linspace(-6, 6, timeSteps);
for(int i = 0; i < timeSteps; i++)
{
betas[i] = DiffusionInternalSigmoid(betas[i]) * (end - start) + start;
}
return betas;
}
void TestLinearSpace()
{
int arrayLength = 40;
float start = 25.7f;
float end = 27.9f;
float* result = linspace(start, end, arrayLength);
assert(result != NULL);
for(int i = 0; i < arrayLength; i++)
{
printf("%f ", result[i]);
}
free(result);
}
void ComputeDiffusionParameters(int timeSteps, float *betas,
float *alphas, float *alphasCumulativeProduct, float *alphasCumulativeProductPrevious,
float *squareRootReciprocalAlphas, float *squareRootAlphasCumulativeProduct,
float *squareRootMinusOneAlphasCumulativeProduct, float *posteriorVariance
)
{
//Step 1 : Define alphas
for(int i = 0; i < timeSteps; i++)
{
alphas[i] = 1.0f - betas[i];
//Avoid division by 0 in later steps
assert(alphas[i] != 0.0f);
}
//Step 2: Compute alphas cumulative product
alphasCumulativeProduct[0] = alphas[0];
for(int i = 1; i < timeSteps; i++)
{
alphasCumulativeProduct[i] = alphasCumulativeProduct[i-1] * alphas[i];
}
//Step 3: Compute alphas cumulative product previous
alphasCumulativeProductPrevious[0] = 1.0f;
for(int i = 1; i < timeSteps; i++)
{
alphasCumulativeProductPrevious[i] = alphasCumulativeProduct[i-1];
}
//Step 4 : Compute squareRoot of the Reciprocal of Alphas
for(int i = 0; i < timeSteps; i++)
{
squareRootReciprocalAlphas[i] = sqrtf(1.0f / alphas[i]);
}
//Step 5 : Compute squareRoot of alphasCumulativeProduct
for(int i = 0; i < timeSteps; i++)
{
squareRootAlphasCumulativeProduct[i] = sqrtf(alphasCumulativeProduct[i]);
}
//Step 6 : Compute squareRoot of MinusOne AlphasCumulativeProduct
for(int i = 0; i < timeSteps; i++)
{
squareRootMinusOneAlphasCumulativeProduct[i] = sqrtf(1.0f - alphasCumulativeProduct[i]);
}
//Step 7 : Compute Posterior Variance
for(int i = 0; i < timeSteps; i++)
{
if(1.0f - alphasCumulativeProduct[i] == 0.0f)
{
posteriorVariance[i] = 0.0f;
}
else
{
posteriorVariance[i] = betas[i] * (1.0f - alphasCumulativeProductPrevious[i]) / (1.0f - alphasCumulativeProduct[i]);
}
}
}
float random_uniform(float min, float max) {
return min + (max - min) * ((float)rand() / RAND_MAX);
}
float random_gaussian(float mean, float std_dev) {
float u1 = ((float)rand() + 1.0f) / ((float)RAND_MAX + 1.0f);
float u2 = ((float)rand() + 1.0f) / ((float)RAND_MAX + 1.0f);
return mean + std_dev * sqrtf(-2.0f * logf(u1)) * cosf(2.0f * M_PI * u2);
}
void GenerateSwissRoll2D(int arrayLength, float noise, float *timeStep, float *xCoordinates, float *yCoordinates)
{
for(int i = 0; i < arrayLength; i++)
{
timeStep[i] = (3 * M_PI / 2) * (1 + 2 * random_uniform(0, 1));
xCoordinates[i] = timeStep[i] * cosf(timeStep[i]) + random_gaussian(0, noise);
yCoordinates[i] = timeStep[i] * sinf(timeStep[i]) + random_gaussian(0, noise);
}
}
void AddDiffusionParameters(int diffusionStep, int totalTimeSteps, int swissRollLength, float *actualNoise, float *original, float *result, float *squareRootAlphasCumulativeProduct, float *squareRootMinusOneAlphasCumulativeProduct)
{
assert(diffusionStep > -1);
assert(diffusionStep < totalTimeSteps);
float sqrt_alphas_cumprod_t = squareRootAlphasCumulativeProduct[diffusionStep];
float sqrt_one_minus_alphas_cumprod_t = squareRootMinusOneAlphasCumulativeProduct[diffusionStep];
for(int i = 0; i < swissRollLength; i++)
{
float noise = random_uniform(0.0f, 1.0f);
actualNoise[i] = noise;
result[i] = sqrt_alphas_cumprod_t * original[i] + sqrt_one_minus_alphas_cumprod_t * noise;
}
}
void TestLinearBetas()
{
int timeSteps = 40;
float *betas = LinearBetaSchedule(timeSteps);
assert(betas != NULL);
for(int i = 0; i < timeSteps; i++)
{
printf("%f ", betas[i]);
}
free(betas);
}
void TestQuadraticBetas()
{
int timeSteps = 4;
float *betas = QuadraticBetaSchedule(timeSteps);
assert(betas != NULL);
for(int i = 0; i < timeSteps; i++)
{
printf("%f ", betas[i]);
}
free(betas);
}
void TestCosineBetas()
{
int timeSteps = 40;
float *betas = CosineBetaSchedule(timeSteps);
assert(betas != NULL);
for(int i = 0; i < timeSteps; i++)
{
printf("%f ", betas[i]);
}
free(betas);
}
void TestSigmoidBetas()
{
int timeSteps = 4;
float *betas = SigmoidBetaSchedule(timeSteps);
assert(betas != NULL);
for(int i = 0; i < timeSteps; i++)
{
printf("%f ", betas[i]);
}
free(betas);
}
void Test2DSwissRoll()
{
float noise = 0.0f;
int arrayLength = 10000;
float *timeStep = calloc(arrayLength, sizeof(float));
float *xCoordinates = calloc(arrayLength, sizeof(float));
float *yCoordinates = calloc(arrayLength, sizeof(float));
GenerateSwissRoll2D(arrayLength, noise, timeStep, xCoordinates, yCoordinates);
FILE *fp = fopen("swissroll_2d.txt", "w");assert(fp != NULL);
for(int i = 0; i < arrayLength; i++)
{
fprintf(fp,"%f %f\n", xCoordinates[i], yCoordinates[i]);
}
fclose(fp);
free(timeStep);
free(xCoordinates);
free(yCoordinates);
}
void TestComputeDiffusionParameters()
{
int timeSteps = 10000;
float *betas = SigmoidBetaSchedule(timeSteps);
float *alphas = calloc(timeSteps, sizeof(float));
float *alphasCumulativeProduct = calloc(timeSteps, sizeof(float));
float *alphasCumulativeProductPrevious = calloc(timeSteps, sizeof(float));
float *squareRootReciprocalAlphas = calloc(timeSteps, sizeof(float));
float *squareRootAlphasCumulativeProduct = calloc(timeSteps, sizeof(float));
float *squareRootMinusOneAlphasCumulativeProduct = calloc(timeSteps, sizeof(float));
float *posteriorVariance = calloc(timeSteps, sizeof(float));
ComputeDiffusionParameters(timeSteps, betas,alphas, alphasCumulativeProduct, alphasCumulativeProductPrevious,squareRootReciprocalAlphas,squareRootAlphasCumulativeProduct,squareRootMinusOneAlphasCumulativeProduct, posteriorVariance);
float swissRollNoise = 0.0f;
int swissRollLength = 1000;
float *actualNoise = calloc(swissRollLength, sizeof(float));
float *swissRollTimeStep = calloc(swissRollLength, sizeof(float));
float *swissRollXCoordinates = calloc(swissRollLength, sizeof(float));
float *swissRollYCoordinates = calloc(swissRollLength, sizeof(float));
float *diffusionXCoordinates = calloc(swissRollLength, sizeof(float));
float *diffusionYCoordinates = calloc(swissRollLength, sizeof(float));
GenerateSwissRoll2D(swissRollLength, swissRollNoise, swissRollTimeStep, swissRollXCoordinates, swissRollYCoordinates);
//Add to X coordinates
int diffusionStep = 100;
AddDiffusionParameters(diffusionStep, timeSteps, swissRollLength, actualNoise, swissRollXCoordinates, diffusionXCoordinates, squareRootAlphasCumulativeProduct, squareRootMinusOneAlphasCumulativeProduct);
//Add to Y coordinates
AddDiffusionParameters(diffusionStep, timeSteps, swissRollLength, actualNoise, swissRollYCoordinates, diffusionYCoordinates, squareRootAlphasCumulativeProduct, squareRootMinusOneAlphasCumulativeProduct);
char diffusionFileName[50];
snprintf(diffusionFileName, sizeof(diffusionFileName), "swissroll_2d%d.txt", diffusionStep);
FILE *fp = fopen(diffusionFileName, "w");assert(fp != NULL);
for(int i = 0; i < swissRollLength; i++)
{
fprintf(fp,"%f %f\n", diffusionXCoordinates[i], diffusionYCoordinates[i]);
}
fclose(fp);
free(actualNoise);
free(diffusionXCoordinates);
free(diffusionYCoordinates);
free(swissRollTimeStep);
free(swissRollXCoordinates);
free(swissRollYCoordinates);
free(alphas);
free(alphasCumulativeProduct);
free(alphasCumulativeProductPrevious);
free(squareRootReciprocalAlphas);
free(squareRootAlphasCumulativeProduct);
free(squareRootMinusOneAlphasCumulativeProduct);
free(posteriorVariance);
free(betas);
}
int main()
{
//TestLinearSpace();
//TestLinearBetas();
//TestQuadraticBetas();
//TestCosineBetas();
//TestSigmoidBetas();
//Test2DSwissRoll();
TestComputeDiffusionParameters();
return 0;
}
import numpy as np
import matplotlib.pyplot as plt
# Load data
data = np.loadtxt("swissroll_2d100.txt")
X, Z = data[:, 0], data[:, 1] # Extract X and Z
# Scatter plot
plt.figure(figsize=(8, 6))
plt.scatter(X, Z, c=np.arctan2(Z, X), cmap="rainbow", s=5, alpha=0.7)
plt.colorbar(label="Angle")
plt.title("2D Swiss Roll Visualization")
plt.xlabel("X")
plt.ylabel("Z")
plt.axis("equal")
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment