Created
October 16, 2023 07:43
-
-
Save e-dreyer/5b09b0874decd5b87848c23c15fcaf7e to your computer and use it in GitHub Desktop.
test.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# %% [markdown] | |
# # Channel Estimation | |
# | |
# - The multi-path channel acts as a filter on the transmitted information. | |
# - The channel impulse response **CIR** has to be estimated to enable the detection algorithms, **DFE** and **MLSE** to estimate the transmitted block or sequence of symbols. | |
# - This is made possible by transmitting a sequence of known symbols, known as **pilot symbols** or **training symbols**, in the center of the data block. | |
# - Assuming the **CIR** is time-invariant for the duration of a data block, the **CIR** can be estimated from the **pilot symbols** using a process called **least squares** or **LS** estimation. | |
# | |
# \begin{equation} | |
# S = [s_1, s_2, \dots, p_1, p_2, \dots, p_p, \dots, s_{N-1}, s_{N}] | |
# \end{equation} | |
# | |
# - The transmitted data block of length $N$ contains $P$ **pilot symbols** $p = [p_1, p_2, \dots, p_p]$, where $p_1$ starts at $t = (N/2) - (P/2) + 1$ if $N$ and $P$ are even numbers and ends at $t = (N/2) + (P/2)$. | |
# - $P$ is usually a multiple of the **CIR** length $L$. | |
# | |
# The standard transmission model for a system transmitting information through a multi-path channel is: | |
# | |
# \begin{equation} | |
# r = Cs + n | |
# \end{equation} | |
# | |
# \begin{equation} | |
# \begin{bmatrix} | |
# r_{1} \\ | |
# r_{2} \\ | |
# \vdots \\ | |
# r_{N-1} \\ | |
# r_{N} | |
# \end{bmatrix} | |
# = | |
# \begin{bmatrix} | |
# C_{0} & 0 & \dots & \dots & \dots & 0 \\ | |
# C_{1} & C_{0} & 0 & \dots & \dots & 0 \\ | |
# C_{2} & C_{1} & C_{0} & 0 & \dots & 0 \\ | |
# \vdots & \ddots & \ddots & \ddots & \ddots & \vdots \\ | |
# C_{L-1} & \ddots & \ddots & \ddots & C_{0} & 0 \\ | |
# 0 & C_{L-1} & \ddots & \ddots & C_{1} & C_{0} \\ | |
# \end{bmatrix} | |
# \begin{bmatrix} | |
# s_{1} \\ | |
# s_{2} \\ | |
# \vdots \\ | |
# \vdots \\ | |
# \vdots \\ | |
# s_{N} \\ | |
# \end{bmatrix} + \bar{n} | |
# \end{equation} | |
# | |
# Which can be written as | |
# | |
# \begin{equation} | |
# r = Qc + n | |
# \end{equation} | |
# | |
# \begin{equation} | |
# \begin{bmatrix} | |
# r_{1} \\ | |
# r_{2} \\ | |
# \vdots \\ | |
# r_{N} \\ | |
# \end{bmatrix} | |
# = | |
# \begin{bmatrix} | |
# s_{n} & s_{n-1} & \dots & \dots & \dots & s_{n-L+1} \\ | |
# s_{n+1} & s_{n} & s_{n-1} & \dots & \dots & s_{n-L+2} \\ | |
# s_{n+2} & s_{n+1} & s_{n} & s_{n-1} & \dots & s_{n-L+3} \\ | |
# \vdots & \vdots & \vdots & \vdots & \ddots & \vdots \\ | |
# \end{bmatrix} | |
# \begin{bmatrix} | |
# C_{0} \\ | |
# C_{1} \\ | |
# \vdots \\ | |
# C_{L-1} \\ | |
# \end{bmatrix} | |
# | |
# + \bar{n} | |
# | |
# \end{equation} | |
# | |
# If we know the values of the **pilot symbols** in $Q$ and the **received symbols** in $r$, which corresponds to the **pilot symbols** in $Q$, we can estimate $c$ using **least squares LS** estimation. **LS** estimation seeks to minimize the square of the error: | |
# | |
# \begin{equation} | |
# \epsilon = r - Qc | |
# \end{equation} | |
# | |
# This is achieved through partial differentiation and yields | |
# | |
# \begin{equation} | |
# Q^{H}Qc = Q^{H}r | |
# \end{equation} | |
# | |
# The final **LS** estimate of $c$ is | |
# | |
# \begin{equation} | |
# \tilde{c} = \frac{Q^{H}r}{Q^{H}Q} | |
# \end{equation} | |
# | |
# Which is the exact solution assuming the **pilot sequence** is chosen that $Q^{H}Q \approx kI$ where $I$ is the identity matrix. The approximate solution is given by: | |
# | |
# \begin{equation} | |
# \tilde{c} = \frac{1}{k}(Q^{H}r) | |
# \end{equation} | |
# | |
# %% [markdown] | |
# ## Example 1 | |
# | |
# \begin{equation} | |
# N = 20 | |
# \end{equation} | |
# | |
# \begin{equation} | |
# L = 3 | |
# \end{equation} | |
# | |
# \begin{equation} | |
# P = 3L = 9 | |
# \end{equation} | |
# | |
# \begin{equation} | |
# s = | |
# \begin{bmatrix} | |
# s_{1} & s_{2} & s_{3} & \dots & p_{1} & p_{2} & p_{3} & p_{4} & p_{5} & p_{6} & p_{7} & p_{8} & p_{9} & \dots & s_{18} & s_{19} & s\_{20} \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# $Q$ is then a $(P - L + 1) \times (L)$ matrix: | |
# | |
# \begin{equation} | |
# Q = | |
# \begin{bmatrix} | |
# p_{3} & p_{2} & p_{1} \\ | |
# p_{4} & p_{3} & p_{2} \\ | |
# p_{5} & p_{4} & p_{3} \\ | |
# p_{6} & p_{5} & p_{4} \\ | |
# p_{7} & p_{6} & p_{5} \\ | |
# p_{8} & p_{7} & p_{6} \\ | |
# p_{9} & p_{8} & p_{7} \\ | |
# \end{bmatrix} | |
# = | |
# \begin{bmatrix} | |
# s_{8} & s_{7} & s_{6} \\ | |
# s_{9} & s_{8} & s_{7} \\ | |
# s_{10} & s_{9} & s_{8} \\ | |
# s_{11} & s_{10} & s_{9} \\ | |
# s_{12} & s_{11} & s_{10} \\ | |
# s_{13} & s_{12} & s_{11} \\ | |
# s_{14} & s_{13} & s_{12} \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# $Q^{H}$ is then an $(L) \times (P - L + 1)$ matrix: | |
# | |
# \begin{equation} | |
# Q^{H} = | |
# \begin{bmatrix} | |
# s_{8} & s_{9} & s_{10} & s_{11} & s_{12} & s_{13} & s_{14} \\ | |
# s_{7} & s_{8} & s_{9} & s_{10} & s_{11} & s_{12} & s_{13} \\ | |
# s_{6} & s_{7} & s_{8} & s_{9} & s_{10} & s_{11} & s\_{12} \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# The start and end indices are then calculated as: | |
# | |
# \begin{equation} | |
# p\_{\text{start}} = \text{floor}(N/2 - P/2 + 1) = \text{floor}(20/2 - 9/2 + 1) = 6 | |
# \end{equation} | |
# | |
# \begin{equation} | |
# p\_{\text{stop}} = \text{floor}(N/2 + P/2) = \text{floor}(20/2 + 9/2) = 14 | |
# \end{equation} | |
# | |
# %% [markdown] | |
# ## Example 2 | |
# | |
# The parameters of the transmission are given as: | |
# | |
# \begin{equation} | |
# N = 20 | |
# \end{equation} | |
# | |
# \begin{equation} | |
# L = 4 | |
# \end{equation} | |
# | |
# \begin{equation} | |
# P = 2L = 8 | |
# \end{equation} | |
# | |
# The **start** and **stop** indices are calculated as: | |
# | |
# \begin{equation} | |
# p\_{\text{start}} = \text{floor}(N/2 - P/2 + 1) = \text{floor}(20/2 - 8/2 + 1) = 7 | |
# \end{equation} | |
# | |
# \begin{equation} | |
# p\_{\text{stop}} = \text{floor}(N/2 + P/2) = \text{floor}(20/2 + 8/2) = 14 | |
# \end{equation} | |
# | |
# The transmitted and received sequences are given as: | |
# | |
# \begin{equation} | |
# s = | |
# \begin{bmatrix} | |
# -1 & -1 & 1 & 1 & 1 & -1 & \textbf{-1} & \textbf{-1} & \textbf{1} & \textbf{-1} & \textbf{-1} & \textbf{-1} & \textbf{1} & \textbf{1} & 1 & -1 & 1 & -1 & 1 & -1 \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# \begin{equation} | |
# r = | |
# \begin{bmatrix} | |
# 0.5284 & 1.6983 & 0.9833,-0.5284,-1.6983,-0.9833 & \textbf{0.3836} & \textbf{0.0419} & \textbf{-1.6983} & \textbf{-0.9833} & \textbf{0.5284} & 1.6983 & 0.9833,-0.3836,-0.1866 & 0.1866,-0.1866 & 0.1143 & -0.8701 & 0.2851 \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# And the **pilot symbols** as: | |
# | |
# \begin{equation} | |
# p = | |
# \begin{bmatrix} | |
# -1 & -1 & 1 & -1 & -1 & -1 & 1 & 1 | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# Thus $Q$ can be calculated as: | |
# | |
# \begin{equation} | |
# Q = | |
# \begin{bmatrix} | |
# 1 & 1 & -1 & -1 \\ | |
# -1 & -1 & 1 & -1 \\ | |
# -1 & -1 & -1 & 1 \\ | |
# 1 & -1 & -1 & -1 \\ | |
# 1 & 1 & -1 & -1 \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# and $Q^{H}Q$ as: | |
# | |
# \begin{equation} | |
# Q^{H}Q = | |
# \begin{bmatrix} | |
# 5 & 1 & -1 & -1 \\ | |
# 1 & 5 & -1 & -1 \\ | |
# -1 & -1 & 5 & 1 \\ | |
# -1 & -1 & 1 & 5 \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# $r^{H}$ is then calculated as: | |
# | |
# \begin{equation} | |
# r^{H} = | |
# \begin{bmatrix} | |
# 0.3836 \\ | |
# 0.0419 \\ | |
# -1.6983 \\ | |
# -0.9833 \\ | |
# 0.5284 \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# The exact solution is then calculated as | |
# | |
# \begin{equation} | |
# c\_{\text{exact}} = (Q^{H}r^{H})^{T} (Q^{H}Q)^{-1} | |
# \end{equation} | |
# | |
# And the approximate solution with $k = 5$ as | |
# | |
# \begin{equation} | |
# c\_{\text{approximate}} = (\frac{1}{k})(Q^{H}r^{H}) | |
# \end{equation} | |
# | |
# %% | |
import numpy as np | |
N = 20 | |
L = 4 | |
P = 2*L | |
s = [ -1, -1, 1, 1, 1, -1, -1, -1, 1, -1, -1, -1, 1, 1, 1, -1, 1, -1, 1, -1] | |
r = [ 0.5284, 1.6983, 0.9833,-0.5284,-1.6983,-0.9833, 0.3836, 0.0419,-1.6983, -0.9833, 0.5284, 1.6983, 0.9833,-0.3836,-0.1866, 0.1866,-0.1866, 0.1143, -0.8701, 0.2851] | |
p_start = round((N/2) - (P/2) + 1) | |
p_end = round((N/2) + (P/2)) | |
pilot_symbols = s[p_start-1:p_end] | |
Q = np.matrix([pilot_symbols[index:index+L][::-1] for index in range(0, P - L + 1)]) | |
Q_H = Q.getH() | |
r_ = np.matrix([r[p_start - 1:p_start + L]]) | |
r_H = r_.getH() | |
term_1 = (Q_H*r_H) | |
term_2 = Q_H*Q | |
c_exact = term_1.transpose()*(np.linalg.inv(term_2)) | |
print("exact: {0}".format(c_exact)) | |
k = 5 | |
c_approx = (1/k)*term_1 | |
print("approximation: {0}".format(c_approx)) | |
# %% [markdown] | |
# # Orthogonal Frequency-Division Multiplexing (OFDM) | |
# | |
# - **OFDM** is a frequency-domain modulation scheme that effectively mitigates the effect of multi-path. | |
# - Unlike **MLSE** that has a computational complexity linear in the **data block length** $N$ and exponential in the **CIR** length $L$, **OFDM** makes use of matrix and vector operations that are much more efficient and provide optimal transmitted symbol sequence estimates. | |
# - **OFDM** is very sensitive to synchronization errors caused by **Doppler shifts** and it suffers from the well-known peak-to-average power ratio **PAPR** problem, which results from saturation of the receiver **ADC**. | |
# | |
# The key to **OFDM** is the cyclic prefix **CP**. $L-1$ symbols are copied from the back and added to the front as **header symbols**. | |
# | |
# A normal block is shown as | |
# | |
# \begin{equation} | |
# \begin{bmatrix} | |
# s_{1} & s_{2} & s_{3} & \dots & s_{N-L+2} & \dots & s\_{N} \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# The data block with the **CP** added then yields: | |
# | |
# \begin{equation} | |
# \begin{bmatrix} | |
# s_{N-L+2} & \dots & s_{N} & s_{1} & s_{2} & s_{3} & \dots & s_{N-L+2} & \dots & s\_{N} \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# If this **OFDM** data block were to be transmitted through a multi-path channel with $L = 4$, the received symbols would look as follows | |
# | |
# \begin{align*} | |
# r_{1} &= s_{1}c_{0} &+ s_{N}c_{1} &+ s_{N-1}c_{2} &+ c_{N-2}c_{3} &+ n_{1} \\ | |
# r_{2} &= s_{2}c_{0} &+ s_{1}c_{1} &+ s_{N}c_{2} &+ c_{N-1}c_{3} &+ n_{2} \\ | |
# r_{3} &= s_{3}c_{0} &+ s_{2}c_{1} &+ s_{1}c_{2} &+ c_{N}c_{3} &+ n_{3} \\ | |
# r_{4} &= s_{4}c_{0} &+ s_{3}c_{1} &+ s_{2}c_{2} &+ c_{1}c_{3} &+ n_{4} \\ | |
# \vdots & & \vdots & \vdots & \vdots & \vdots \\ | |
# r_{N} &= s_{N}c_{0} &+ s_{N-1}c_{1} &+ s_{N-2}c_{2} &+ s_{N-3}c_{3} &+ n_{N} \\ | |
# \end{align*} | |
# | |
# %% [markdown] | |
# ## Example: Actual with CP and conventional channel matrix | |
# | |
# Transmission parameters are given as | |
# | |
# \begin{align*} | |
# L &= 2 \\ | |
# N &= 4 \\ | |
# C &= | |
# \begin{bmatrix} | |
# 0.75 & -0.24 | |
# \end{bmatrix} \\ | |
# \end{align*} | |
# | |
# And the $F$ matrix as | |
# | |
# \begin{equation} | |
# F = | |
# \begin{bmatrix} | |
# 1 & 1 & 1 & 1 \\ | |
# 1 & j & -1 & -j \\ | |
# 1 & -1 & 1 & -1 \\ | |
# 1 & -j & -1 & j \\ | |
# \end{bmatrix}/N | |
# \end{equation} | |
# | |
# \begin{equation} | |
# S = | |
# \begin{bmatrix} | |
# 1 & -1 & -1 & 1 | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# Taking the inverse Fourier of S | |
# | |
# \begin{equation} | |
# s = F^{H}S = | |
# \begin{bmatrix} | |
# 0.0 - 0.5j & 0.0 +0.0j & 0.5 + 0.5j & 0.0 +0.0j & 0.5 - 0.5j \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# Adding the **CP** | |
# | |
# \begin{equation} | |
# s = | |
# \begin{bmatrix} | |
# 0.5 - 0.5j & 0.0 + 0.0j & 0.5 + 0.5j & 0.0 + 0.0j & 0.5 - 0.5j \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# Channel matrix of the actual channel | |
# | |
# \begin{equation} | |
# C\_ = | |
# \begin{bmatrix} | |
# 0.75 & 0 & 0 & 0 \\ | |
# -0.24 & 0.75 & 0 & 0 \\ | |
# 0 & -0.24 & 0.75 & 0 \\ | |
# 0 & 0 & -0.24 & 0.75 \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# The transmission model with no noise | |
# | |
# \begin{equation} | |
# r = Cs' + n, (n = 0) | |
# \end{equation} | |
# | |
# The received symbols | |
# | |
# \begin{equation} | |
# r = | |
# \begin{bmatrix} | |
# 0.375 - j0.375 & -0.12 + j0.12 & 0.375 + j0.375 & -0.12 - j0.12 & 0.375 - j0.375 \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# Remove the **CP** | |
# | |
# \begin{equation} | |
# r = | |
# \begin{bmatrix} | |
# -0.12 + j0.12 & 0.375 + j0.375 & -0.12 - j0.12 & 0.375 - j0.375 \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# Get the Fourier of r | |
# | |
# \begin{equation} | |
# R = Fr^{T} = | |
# \begin{bmatrix} | |
# 0.1275 + j0.0 & -0.1875 + j0.06 & -0.2475 + j0.0 & 0.1875 + j0.06 \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# Calculate the Fourier of CIR, padded with zeros | |
# | |
# \begin{equation} | |
# \Alpha = F | |
# \begin{bmatrix} | |
# c & 0 & 0 | |
# \end{bmatrix}^{T} | |
# = | |
# \begin{bmatrix} | |
# 0.1275 + j0.0 & 0.1875 - j0.06 & 0.2475 + j0.0 & 0.1875 + j0.06 \ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# Calculate the estimate of S | |
# | |
# \begin{equation} | |
# S\_{\text{est}} = \frac{R}{\Delta} = | |
# \begin{bmatrix} | |
# 1 & -1 & -1 & 1 | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# %% | |
import numpy as np | |
L = 2 | |
N = 4 | |
c = [0.75, -0.24] | |
F: np.matrix = np.matrix([ | |
[1, 1, 1, 1], | |
[1, 1j, -1, -1j], | |
[1, -1, 1, -1], | |
[1, -1j, -1, 1j], | |
])/N | |
S = [1, -1, -1, 1] | |
# Take inverse Fourier of S | |
s = np.matmul(F.getH(), S) | |
# Add CP symbols to the start of the list | |
s_cp = [s.tolist()[0][-1]] + s.tolist()[0] | |
# Construct C | |
C_ = list() | |
temp = c + [0] * (L + 1) | |
for i in range(0, N + 1): | |
C_.append(temp) | |
temp = [0] + temp[:-1] | |
C_ = np.matrix(C_).T | |
# transmission model with no noise | |
r = np.matmul(C_, s_cp) | |
# Remove CP | |
r = np.matrix(r.tolist()[0][1:]) | |
# Get Fourier of r | |
R = np.matmul(F, r.T) | |
print(f'{R.tolist()=}\n') | |
# Calculate Fourier of CIR, padded with zeros | |
D = np.matmul(F, c + [0] * (N - L)) | |
print(f'{D.tolist()=}\n') | |
# Calculate estimate of S | |
s_est = np.diag(R/D) | |
print(f'{s_est.tolist()=}\n') | |
# %% | |
import numpy as np | |
L = 4 | |
N = 8 | |
c = [1.3475 - 1.0709j,-1.2653 + 0.6229j, 0.4440 + 0.1800j, 0.2213 - 0.0110j] | |
F: np.matrix = np.matrix([[1.0000 + 0.0000j, 1.0000 + 0.0000j, 1.0000 + 0.0000j, 1.0000 + 0.0000j, 1.0000 + 0.0000j, 1.0000 + 0.0000j, 1.0000 + 0.0000j, 1.0000 + 0.0000j], | |
[1.0000 + 0.0000j, 0.7071 - 0.7071j, 0.0000 - 1.0000j,-0.7071 - 0.7071j,-1.0000 + 0.0000j,-0.7071 + 0.7071j, 0.0000 + 1.0000j, 0.7071 + 0.7071j], | |
[1.0000 + 0.0000j, 0.0000 - 1.0000j,-1.0000 + 0.0000j, 0.0000 + 1.0000j, 1.0000 + 0.0000j, 0.0000 - 1.0000j,-1.0000 + 0.0000j, 0.0000 + 1.0000j], | |
[1.0000 + 0.0000j,-0.7071 - 0.7071j, 0.0000 + 1.0000j, 0.7071 - 0.7071j,-1.0000 + 0.0000j, 0.7071 + 0.7071j, 0.0000 - 1.0000j,-0.7071 + 0.7071j], | |
[1.0000 + 0.0000j,-1.0000 + 0.0000j, 1.0000 + 0.0000j,-1.0000 + 0.0000j, 1.0000 + 0.0000j,-1.0000 + 0.0000j, 1.0000 + 0.0000j,-1.0000 + 0.0000j], | |
[1.0000 + 0.0000j,-0.7071 + 0.7071j, 0.0000 - 1.0000j, 0.7071 + 0.7071j,-1.0000 + 0.0000j, 0.7071 - 0.7071j, 0.0000 + 1.0000j,-0.7071 - 0.7071j], | |
[1.0000 + 0.0000j, 0.0000 + 1.0000j,-1.0000 + 0.0000j, 0.0000 - 1.0000j, 1.0000 + 0.0000j, 0.0000 + 1.0000j,-1.0000 + 0.0000j, 0.0000 - 1.0000j], | |
[1.0000 + 0.0000j, 0.7071 + 0.7071j, 0.0000 + 1.0000j,-0.7071 + 0.7071j,-1.0000 + 0.0000j,-0.7071 - 0.7071j, 0.0000 - 1.0000j, 0.7071 - 0.7071j]] | |
)/N | |
# S = [1, -1, -1, 1] | |
# Take inverse Fourier of S | |
# s = np.matmul(F.getH(), S) | |
# Add CP symbols to the start of the list | |
# s_cp = [s.tolist()[0][-1]] + s.tolist()[0] | |
# Construct C | |
C_ = list() | |
temp = c + [0] * (L + 1) | |
for i in range(0, N + 1): | |
C_.append(temp) | |
temp = [0] + temp[:-1] | |
C_ = np.matrix(C_).T | |
# transmission model with no noise | |
r = np.matrix([-2.8907 - 2.3466j, 4.3470 + 0.7666j, 2.2551 - 4.5870j, 1.5209 - 1.5624j, 0.3607 + 0.5425j,-0.9059 - 2.8467j,-0.6998 - 3.0052j,0])#np.matmul(C_, s_cp) | |
# Remove CP | |
# r = np.matrix(r.tolist()[0][1:]) | |
# Get Fourier of r | |
R = np.matmul(F, r.T) | |
print(f'{R.tolist()=}\n') | |
# Calculate Fourier of CIR, padded with zeros | |
D = np.matmul(F, c + [0] * (N - L)) | |
print(f'{D.tolist()=}\n') | |
# Calculate estimate of S | |
s_est = np.diag(R/D) | |
print(f'{s_est.tolist()=}\n') | |
# %% | |
import numpy as np | |
L = 4 | |
N = 8 | |
c = [1.3475 - 1.0709j,-1.2653 + 0.6229j, 0.4440 + 0.1800j, 0.2213 - 0.0110j] | |
F: np.matrix = np.matrix( | |
[[1.0000 + 0.0000j, 1.0000 + 0.0000j, 1.0000 + 0.0000j, 1.0000 + 0.0000j, 1.0000 + 0.0000j, 1.0000 + 0.0000j, 1.0000 + 0.0000j, 1.0000 + 0.0000j], | |
[1.0000 + 0.0000j, 0.7071 - 0.7071j, 0.0000 - 1.0000j,-0.7071 - 0.7071j,-1.0000 + 0.0000j,-0.7071 + 0.7071j, 0.0000 + 1.0000j, 0.7071 + 0.7071j], | |
[1.0000 + 0.0000j, 0.0000 - 1.0000j,-1.0000 + 0.0000j, 0.0000 + 1.0000j, 1.0000 + 0.0000j, 0.0000 - 1.0000j,-1.0000 + 0.0000j, 0.0000 + 1.0000j], | |
[1.0000 + 0.0000j,-0.7071 - 0.7071j, 0.0000 + 1.0000j, 0.7071 - 0.7071j,-1.0000 + 0.0000j, 0.7071 + 0.7071j, 0.0000 - 1.0000j,-0.7071 + 0.7071j], | |
[1.0000 + 0.0000j,-1.0000 + 0.0000j, 1.0000 + 0.0000j,-1.0000 + 0.0000j, 1.0000 + 0.0000j,-1.0000 + 0.0000j, 1.0000 + 0.0000j,-1.0000 + 0.0000j], | |
[1.0000 + 0.0000j,-0.7071 + 0.7071j, 0.0000 - 1.0000j, 0.7071 + 0.7071j,-1.0000 + 0.0000j, 0.7071 - 0.7071j, 0.0000 + 1.0000j,-0.7071 - 0.7071j], | |
[1.0000 + 0.0000j, 0.0000 + 1.0000j,-1.0000 + 0.0000j, 0.0000 - 1.0000j, 1.0000 + 0.0000j, 0.0000 + 1.0000j,-1.0000 + 0.0000j, 0.0000 - 1.0000j], | |
[1.0000 + 0.0000j, 0.7071 + 0.7071j, 0.0000 + 1.0000j,-0.7071 + 0.7071j,-1.0000 + 0.0000j,-0.7071 - 0.7071j, 0.0000 - 1.0000j, 0.7071 - 0.7071j]] | |
)/N | |
S = [-2.8907 - 2.3466j, 4.3470 + 0.7666j, 2.2551 - 4.5870j, 1.5209 - 1.5624j, 0.3607 + 0.5425j,-0.9059 - 2.8467j,-0.6998 - 3.0052j, 0] | |
# Take inverse Fourier of S | |
s = np.matmul(F.getH(), S) | |
# Add CP symbols to the start of the list | |
s_cp = [s.tolist()[0][-1]] + s.tolist()[0] | |
# Construct C | |
C_ = list() | |
temp = c + [0] * (L + 1) | |
for i in range(0, N + 1): | |
C_.append(temp) | |
temp = [0] + temp[:-1] | |
C_ = np.matrix(C_).T | |
# transmission model with no noise | |
r = np.matmul(C_, s_cp) | |
# Remove CP | |
r = np.matrix(r.tolist()[0][1:]) | |
# Get Fourier of r | |
R = np.matmul(F, r.T) | |
print(f'{R.tolist()=}\n') | |
# Calculate Fourier of CIR, padded with zeros | |
D = np.matmul(F, c + [0] * (N - L)) | |
print(f'{D.tolist()=}\n') | |
# Calculate estimate of S | |
s_est = np.diag(R/D) | |
print(f'{s_est.tolist()=}\n') | |
# %% [markdown] | |
# # Linear Blocks | |
# | |
# - Error correction coding **ECC** or error control coding is the process of encoding information before modulation and transmission in the transmitter and decoding the received information in the receiver after demodulation and detection, in order to correct errors that may have been introduced during. | |
# - The encoder adds redundant information to the source information in a controlled fashion, and the decoder uses this redundant information to help correct errors. | |
# - The result is that errors are reduced and the same throughput can be achieved at lower transmit power or **SNR** compared to an uncoded system. | |
# - Linear block codes are the most basic error correction codes | |
# | |
# ## Encoding | |
# | |
# Encoding works by multiplying the source bits with a generator matrix. For an **AWGN** channel with no multi-path: | |
# | |
# \begin{equation} | |
# c = sG + n | |
# \end{equation} | |
# | |
# Where $c$ is the codeword, $s$ is the source information, $G$ is the generator matrix and $n$ is the **AWGN** | |
# | |
# The generator matrix for a rate $R_{c} = \frac{k}{n}$ block code can be expressed as: | |
# | |
# \begin{equation} | |
# G = | |
# \begin{bmatrix} | |
# I_{k \times k} & P | |
# \end{bmatrix} | |
# = | |
# \begin{bmatrix} | |
# 1 & \dots & 0 & p_{11} \dots & p_{1(n-k)} \\ | |
# \vdots & \ddots & \vdots & \vdots & \ddots & \vdots \\ | |
# 0 & \dots 1 & p_{k1} & \dots & p\_{n(n-k)} \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# Where $I_{k \times k}$ is a $k \times k$ identity matrix and $P$ is the parity matrix | |
# | |
# ## Decoding | |
# | |
# Decoding works by calculating the syndrome vector $z$ and using the result to search for the error positions. The syndrome vector is calculate by | |
# | |
# \begin{equation} | |
# z = \tilde{c}H^{T} | |
# \end{equation} | |
# | |
# Where $\tilde{c}$ is the estimated codeword after detection and symbol-to-bit mapping and $H$ is the parity check matrix: | |
# | |
# \begin{equation} | |
# H = | |
# \begin{bmatrix} | |
# P^{T} & I\_{(n-k) \times (n-k)} \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# Since $GH^{T} = 0$, $G$ and $H$ are orthogonal. Therefore, any $c$ generated using $G$ is orthogonal to any row in $H$. Therefore, if no errors were introduced during transmission $(\tilde{c} = c)$: | |
# | |
# \begin{equation} | |
# z = \tilde{c}H^{T} = 0 | |
# \end{equation} | |
# | |
# Also, when errors were introduced and $\tilde{c} \neq c$ | |
# | |
# \begin{equation} | |
# z = \tilde{c}H^{T} \neq 0 | |
# \end{equation} | |
# | |
# When $z \neq 0$ we have to find it in the rows of $H^{T}$. If not found, we start looking for rows in $H^{T}$ that give $z$ when **XORed**. The columns in question indicate the positions of the errors. To correct the errors we only need to flip the bits in $\tilde{c} | |
# | |
# The number of errors that a given code can correct is determined by | |
# | |
# \begin{equation} | |
# t = \frac{d\_{\text{min}} - 1}{2} | |
# \end{equation} | |
# | |
# Where $d_{\text{min}}$ is the minimum distance of the code or the number of bits by which any codeword $c$ generated by the code differs. | |
# | |
# %% [markdown] | |
# ## Example | |
# | |
# \begin{align*} | |
# k &= 4 \\ | |
# n &= 7 \\ | |
# R\_{c} &= \frac{k}{n} = \frac{4}{7} \\ | |
# \end{align*} | |
# | |
# \begin{equation} | |
# G = | |
# \begin{bmatrix} | |
# 1 & 0 & 0 & 0 & 0 & 1 & 1 \\ | |
# 0 & 1 & 0 & 0 & 1 & 0 & 1 \\ | |
# 0 & 0 & 1 & 0 & 1 & 1 & 0 \\ | |
# 0 & 0 & 0 & 1 & 1 & 1 & 1 \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# \begin{equation} | |
# H = | |
# \begin{bmatrix} | |
# 0 & 1 & 1 & 1 & 1 & 0 & 0 \\ | |
# 1 & 0 & 1 & 1 & 0 & 1 & 0 \\ | |
# 1 & 1 & 0 & 1 & 0 & 0 & 1 \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# The source bits are given as | |
# | |
# \begin{equation} | |
# s = | |
# \begin{bmatrix} | |
# 0 & 1 & 0 & 1 \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# Encoding is calculated as | |
# | |
# \begin{equation} | |
# c = Gs = | |
# \begin{bmatrix} | |
# 0 & 1 & 0 & 1 & 0 & 1 & 0 \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# After transmission, demodulation and detection | |
# | |
# \begin{equation} | |
# c\_{\text{est}} = | |
# \begin{bmatrix} | |
# 0 & 1 & 1 & 1 & 0 & 1 & 0 \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# Decoding | |
# | |
# \begin{equation} | |
# z = c\_{\text{est}} H^{T} = | |
# \begin{bmatrix} | |
# 1 & 1 & 0 \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# Find $z$ in $H$ | |
# | |
# \begin{equation} | |
# H^{T} = | |
# \begin{bmatrix} | |
# 0 & 1 & 1 \\ | |
# 1 & 0 & 1 \\ | |
# 1 & 1 & 0 \\ | |
# 1 & 1 & 1 \\ | |
# 1 & 0 & 0 \\ | |
# 0 & 1 & 0 \\ | |
# 0 & 0 & 1 \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# $z$ corresponds to row 3 in $H^{T}$, therefore the error was made at position 3. To correct the error, flip the bit in $c_{\text{est}}$ at position 3 | |
# | |
# \begin{equation} | |
# c\_{\text{est}} = | |
# \begin{bmatrix} | |
# 0 & 1 & 0 & 1 & 0 & 1 & 0 \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# %% | |
import numpy as np | |
from itertools import product | |
k = 4 | |
n = 7 | |
Rc = k/n | |
G = np.array([ | |
[1, 0, 0, 0, 0, 1, 1], | |
[0, 1, 0, 0, 1, 0, 1], | |
[0, 0, 1, 0, 1, 1, 0], | |
[0, 0, 0, 1, 1, 1, 1] | |
]) | |
print(f'G = \n{str(G)} \n') | |
H = np.concatenate((G[:, k:], np.identity(n-k))).T | |
print(f'H = \n{str(H)} \n') | |
s = np.array([0, 1, 0, 1]) | |
print(f's = \n{str(s)} \n') | |
c = np.dot(s, G) % 2 | |
print(f'c = \n{str(c)} \n') | |
c_est = np.array([0, 1, 1, 1, 0, 1, 0]) | |
print(f'c_est = \n{str(c_est)} \n') | |
z = np.dot(c_est, H.T) % 2 | |
print(f'z = \n{str(z)} \n') | |
H_T = H.T | |
matching_rows = np.zeros(len(c_est)) | |
found = False | |
count = 1 | |
while not found: | |
prod = list(product(range(0, len(H_T)), repeat=count)) | |
for p in prod: | |
if count == 1: | |
if (z == H_T[p[0]]).all(): | |
matching_rows[p[0]] = 1 | |
found = True | |
else: | |
result = H_T[p[0]] | |
for _p in p[1:]: | |
result = np.array([int(x) for x in np.logical_xor(result, H_T[_p], casting='same_kind')]) | |
if (z == result).all(): | |
matching_rows[p[0]] = 1 | |
matching_rows[_p] = 1 | |
found = True | |
count += 1 | |
print(f'matching_rows = \n{str(matching_rows)} \n') | |
if found: | |
c_est = np.array([int(x) for x in np.logical_xor(c_est, matching_rows, casting='same_kind')]) | |
print(f'c_est = \n{str(c_est)} \n') | |
uncoded_sequence = c_est[:len(s)] | |
print(f'uncoded_sequence = \n{str(uncoded_sequence)} \n') | |
# %% | |
from itertools import product | |
import numpy as np | |
k = 8 | |
n = 14 | |
Rc = k/n | |
G = np.array([ | |
[1, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0], | |
[0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0], | |
[0, 0, 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1, 1], | |
[0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 1, 1, 0, 1], | |
[0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 1, 0], | |
[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 1, 0], | |
[0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 1], | |
[0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 1, 0, 1], | |
]) | |
print(f'G = \n{str(G)} \n') | |
H = np.concatenate((G[:, k:], np.identity(n-k))).T | |
print(f'H = \n{str(H)} \n') | |
s = np.array([1, 0, 0, 1, 0, 1, 1, 1]) | |
print(f's = \n{str(s)} \n') | |
c = np.dot(s, G) % 2 | |
print(f'c = \n{str(c)} \n') | |
c_est = np.array([1, 0, 0, 0, 0, 1, 1, 1, 1, 0, 1, 0, 1, 0]) | |
print(f'c_est = \n{str(c_est)} \n') | |
z = np.dot(c_est, H.T) % 2 | |
print(f'z = \n{str(z)} \n') | |
H_T = H.T | |
matching_rows = np.zeros(len(c_est)) | |
found = False | |
count = 1 | |
while not found: | |
prod = list(product(range(0, len(H_T)), repeat=count)) | |
for p in prod: | |
if count == 1: | |
if (z == H_T[p[0]]).all(): | |
matching_rows[p[0]] = 1 | |
found = True | |
else: | |
result = H_T[p[0]] | |
for _p in p[1:]: | |
result = np.array([int(x) for x in np.logical_xor(result, H_T[_p], casting='same_kind')]) | |
if (z == result).all(): | |
matching_rows[p[0]] = 1 | |
matching_rows[_p] = 1 | |
found = True | |
count += 1 | |
print(f'matching_rows = \n{str(matching_rows)} \n') | |
if found: | |
c_est = np.array([int(x) for x in np.logical_xor(c_est, matching_rows, casting='same_kind')]) | |
print(f'c_est = \n{str(c_est)} \n') | |
uncoded_sequence = c_est[:len(s)] | |
print(f'uncoded_sequence = \n{str(uncoded_sequence)} \n') | |
# %% | |
k = 10 | |
n = 25 | |
Rc = k/n | |
#generator matrix | |
G = np.array([ | |
[1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1, 0, 0, 1, 1, 1, 1, 1, 0, 1, 1], | |
[0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 0, 0, 0, 1, 0, 0, 0], | |
[0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 1, 1, 1, 1, 1, 1, 0, 0, 1, 0, 0], | |
[0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 1, 1, 1, 0, 1, 0, 0, 1, 1], | |
[0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0], | |
[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 0, 1, 0, 0, 1, 0], | |
[0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 1, 1, 0, 0, 1, 1, 0], | |
[0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 1, 0, 1, 0], | |
[0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1, 1, 0], | |
[0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1, 0, 1, 0, 0, 0, 0, 0, 0], | |
]) | |
print(f'G = \n{str(G)} \n') | |
H = np.concatenate((G[:, k:], np.identity(n-k))).T | |
print(f'H = \n{str(H)} \n') | |
s = np.array([1, 0, 1, 0, 1, 1, 1, 1, 0, 1]) | |
print(f's = \n{str(s)} \n') | |
c = np.dot(s, G) % 2 | |
print(f'c = \n{str(c)} \n') | |
c_est = np.array([0, 0, 1, 0, 0, 1, 1, 1, 0, 1, 0, 1, 0, 0, 0, 1, 1, 0, 1, 1, 0, 0, 1, 0, 1]) | |
print(f'c_est = \n{str(c_est)} \n') | |
z = np.dot(c_est, H.T) % 2 | |
print(f'z = \n{str(z)} \n') | |
H_T = H.T | |
matching_rows = np.zeros(len(c_est)) | |
found = False | |
count = 1 | |
while not found: | |
prod = list(product(range(0, len(H_T)), repeat=count)) | |
for p in prod: | |
if count == 1: | |
if (z == H_T[p[0]]).all(): | |
matching_rows[p[0]] = 1 | |
found = True | |
else: | |
result = H_T[p[0]] | |
for _p in p[1:]: | |
result = np.array([int(x) for x in np.logical_xor(result, H_T[_p], casting='same_kind')]) | |
if (z == result).all(): | |
matching_rows[p[0]] = 1 | |
matching_rows[_p] = 1 | |
found = True | |
count += 1 | |
print(f'matching_rows = \n{str(matching_rows)} \n') | |
if found: | |
c_est = np.array([int(x) for x in np.logical_xor(c_est, matching_rows, casting='same_kind')]) | |
print(f'c_est = \n{str(c_est)} \n') | |
uncoded_sequence = c_est[:len(s)] | |
print(f'uncoded_sequence = \n{str(uncoded_sequence)} \n') | |
# %% | |
# Class test | |
k = 7 | |
n = 14 | |
Rc = k/n | |
#generator matrix | |
G = np.array([ | |
[1,0,0,0,0,0,0,1,0,1,1,1,1,1], | |
[0,1,0,0,0,0,0,1,0,1,0,1,0,1], | |
[0,0,1,0,0,0,0,1,1,1,0,0,0,1], | |
[0,0,0,1,0,0,0,1,1,1,0,0,0,0], | |
[0,0,0,0,1,0,0,1,1,0,0,0,0,0], | |
[0,0,0,0,0,1,0,1,1,1,0,1,1,0], | |
[0,0,0,0,0,0,1,0,1,1,0,0,0,1]]) | |
print(f'G = \n{str(G)} \n') | |
H = np.concatenate((G[:, k:], np.identity(n-k))).T | |
print(f'H = \n{str(H)} \n') | |
# s = np.array([1, 0, 1, 0, 1, 1, 1, 1, 0, 1]) | |
# print(f's = \n{str(s)} \n') | |
# c = np.dot(s, G) % 2 | |
# print(f'c = \n{str(c)} \n') | |
c_est = np.array([1,1,0,1,1,0,1,1,0,0,1,0,0,1]) | |
print(f'c_est = \n{str(c_est)} \n') | |
z = np.dot(c_est, H.T) % 2 | |
print(f'z = \n{str(z)} \n') | |
H_T = H.T | |
matching_rows = np.zeros(len(c_est)) | |
found = False | |
count = 1 | |
while not found: | |
prod = list(product(range(0, len(H_T)), repeat=count)) | |
for p in prod: | |
if count == 1: | |
if (z == H_T[p[0]]).all(): | |
matching_rows[p[0]] = 1 | |
found = True | |
else: | |
result = H_T[p[0]] | |
for _p in p[1:]: | |
result = np.array([int(x) for x in np.logical_xor(result, H_T[_p], casting='same_kind')]) | |
if (z == result).all(): | |
matching_rows[p[0]] = 1 | |
matching_rows[_p] = 1 | |
found = True | |
count += 1 | |
print(f'matching_rows = \n{str(matching_rows)} \n') | |
if found: | |
c_est = np.array([int(x) for x in np.logical_xor(c_est, matching_rows, casting='same_kind')]) | |
print(f'c_est = \n{str(c_est)} \n') | |
uncoded_sequence = c_est[:len(s)] | |
print(f'uncoded_sequence = \n{str(uncoded_sequence)} \n') | |
# %% [markdown] | |
# # Convolutional Codes | |
# | |
# - Convolutional codes are powerful error correction codes able to correct burst errors prevalent in wireless communication systems, when "deep fades" occur as a result of destructive interference of radio waves. | |
# - Encoding works by iteratively encoding a block of uncoded bits by shifting the bits through a shift register, while the contents of the shift register are summed modulo 2, as determined by the encoder connections. | |
# - Decoding is performed using the MLSE algorithm, similar to how sequence detection in the presence of multi-path is performed, albeit with a different cost function. | |
# | |
# # Encoding | |
# | |
# - For a rate of $R_{c} = \frac{n}{k} = \frac{1}{2}$ convolutional encoder, with constraint length $K = 2$ and $g = [2, 3]$. The bits are shifted into the encoder from left to right, $n$ bits at a time. | |
# - For every bit shifted in at time $t$, $k$ bits are produced at the output, namely $c_{t}^{(1)}$ and $c_{t}^{(2)}$. | |
# - The constraint length $K$ is the number of shift register elements, which also determines the memory in the system $K - 1$. | |
# - Higher memory results in more complex decoding $2^{K-1}$ but better decoding performance in terms of the number of errors corrected. | |
# | |
# - Each encoder has a state diagram associated with it, which is used in the decoding process. The state diagram always has $2^{K-1}$ states, which represent the leftmost $K-1$ elements in the state register. The state diagram encodes the following input-output relationships: | |
# | |
# | State | $s_{t}$ | $c_{t}^{(1)}$ | $c_{t}^{(2)}$ | | |
# | ----- | ------- | ------------- | ------------- | | |
# | 0 | 0 | 0 | 0 | | |
# | 0 | 1 | 1 | 1 | | |
# | 1 | 0 | 0 | 1 | | |
# | 1 | 1 | 1 | 0 | | |
# | |
# - Encoding the bit stream of length $N_{u} = 4$ assuming that the encoder starts in the all-zero state, we also have to append $K-1$ 0s to drive the encoder back to the all zero state. | |
# | |
# \begin{equation} | |
# s = | |
# \begin{bmatrix} | |
# s_{1} & s_{2} & s_{3} & s_{4} & 0 \\ | |
# \end{bmatrix} | |
# = | |
# \begin{bmatrix} | |
# 1 & 1 & 0 & 1 & 0 \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# \begin{equation} | |
# c = | |
# \begin{bmatrix} | |
# c_{1}^{(1)} & c_{1}^{(2)} & c_{2}^{(1)} & c_{2}^{(2)} & c_{3}^{(1)} & c_{3}^{(2)} & c_{4}^{(1)} & c_{4}^{(2)} & c_{5}^{(1)} & c_{5}^{(2)} \\ | |
# \end{bmatrix} | |
# = | |
# \begin{bmatrix} | |
# 1 & 1 & 1 & 0 & 0 & 0 & 1 & 1 & 1 & 0 & 1 \\ | |
# \end{bmatrix} | |
# \end{equation} | |
# | |
# ## Decoding | |
# | |
# - The Viterbi MLSE algorithm is used to decode convolutional codes. | |
# - After transmitting the encoded information through a wireless communication channel, the detector produces and estimate $c$ or $\tilde{c}$, which is used in the decoder to find the most probable transmitted sequence of bits, while correcting some errors. | |
# - A different cost function is however used | |
# | |
# \begin{align*} | |
# \Delta_{t}^{(0 \rightarrow 0)} &= |\tilde{c}_{t}^{(1)} - c_{t}^{(1)}|^{2} + |\tilde{c}_{t}^{(2)} - c_{t}^{(2)}|^{2} \\ | |
# \Delta_{t}^{(0 \rightarrow 1)} &= |\tilde{c}_{t}^{(1)} - c_{t}^{(1)}|^{2} + |\tilde{c}_{t}^{(2)} - c_{t}^{(2)}|^{2} \\ | |
# \Delta_{t}^{(1 \rightarrow 0)} &= |\tilde{c}_{t}^{(1)} - c_{t}^{(1)}|^{2} + |\tilde{c}_{t}^{(2)} - c_{t}^{(2)}|^{2} \\ | |
# \Delta_{t}^{(1 \rightarrow 1)} &= |\tilde{c}_{t}^{(1)} - c_{t}^{(1)}|^{2} + |\tilde{c}_{t}^{(2)} - c_{t}^{(2)}|^{2} \\ | |
# \end{align*} | |
# | |
# %% | |
from collections import deque | |
from typing import List, Union, Tuple | |
from dataclasses import dataclass, field | |
import numpy as np | |
BIT_SEQUENCE_TYPE = List[int] | |
SYMBOL_SEQUENCE_TYPE = List[complex] | |
BIT_TO_SYMBOL_MAP_TYPE = List[ List[ Union[ complex, List[int] ] ] ] | |
SYMBOL_BLOCKS_TYPE = List[ List[complex] ] | |
CHANNEL_IMPULSE_RESPONSE_TYPE = List[complex] | |
RANDOM_VALUES_SYMBOLS_TYPE = List[ List[ List[float] ] ] | |
RANDOM_VALUES_CIR_TYPE = List[ List[ List[float] ] ] | |
NOISY_SYMBOL_SEQUENCE_TYPE = List[ List[complex] ] | |
SER_TYPE = Union[float, None] | |
BER_TYPE = Union[float, None] | |
@dataclass(unsafe_hash=True) | |
class Node: | |
# Time of the symbol | |
t: int = field(init=True, hash=True) | |
# Symbol value | |
symbol: complex = field(init=True, hash=True) | |
# Delta value | |
delta: float = field(init=False, default=0.0, hash=True) | |
# Alpha value | |
alpha: float = field(init=False, default=0.0, hash=True) | |
# Children of the node | |
children: List['Node'] = field(init=False, default_factory=lambda: list(), hash=False) | |
# The symbol that was received at the Node, for testing and debugging purposes | |
received_symbol: complex = field(init=True, hash=True) | |
def __str__(self) -> str: | |
output_string = f'{"": >{self.t*4}}└─ [{self.t}] [Δ = {self.delta: <.5f}] [α = {self.alpha: <.5f}] [r = {self.received_symbol: <.5f}] [s = {self.symbol: >5.5f}]\n' | |
for child in self.children: | |
output_string += f'{str(child)}' | |
return output_string | |
def __repr__(self) -> str: | |
if self.t != 0: | |
return f'[{self.t:>4}] [Δ = {self.delta: <.5f}] [∑ = {self.alpha: <.5f}] [r = {self.received_symbol: <.5f}] [s = {self.symbol: >5.5f}]' | |
else: | |
return f'[{self.t:>4}] [Δ = {self.delta: <.5f}] [∑ = {self.alpha: <.5f}] [r = {self.received_symbol: <.5f}] [s = {self.symbol: >5.5f}]' | |
class MLSE: | |
def __init__(self, modulation_mapping: BIT_TO_SYMBOL_MAP_TYPE, c: List): | |
""" | |
Create a new MLSE tree structure | |
""" | |
# set the modulation mapping | |
self.modulation_mapping = modulation_mapping | |
self.C = c | |
self.root = None | |
def getRoutesToLeaves(self) -> List[List[Node]]: | |
""" | |
Get the routes to the leaf nodes using DFS | |
""" | |
def DFS(node: Node, current_path: List[Node], paths: List[List[Node]]): | |
""" | |
Depth-first search for the tree | |
""" | |
# If no root is set, no leaves exist | |
if not node: | |
return | |
else: | |
# Add the current node to the path | |
current_path.append(node) | |
# If node has no children it is a leaf | |
if not node.children: | |
paths.append(current_path.copy()) | |
else: | |
for _child in node.children: | |
DFS(_child, current_path, paths) | |
# Remove from the current path | |
current_path.pop() | |
if not self.root: | |
return list() | |
else: | |
# Create a variable for the paths | |
paths: List[List[Node]] = list() | |
# Perform DFS | |
DFS(self.root, [], paths) | |
# Return paths | |
return paths | |
def getLeafNodes(self) -> List[Node]: | |
""" | |
Get the leaf nodes of the tree structure | |
""" | |
if not self.root: | |
return list() | |
else: | |
# List of the leaf nodes | |
leaf_nodes: List[Node] = list() | |
# Stack for finding leaf nodes | |
node_queue = deque() | |
node_queue.append(self.root) | |
# Loop through all of the nodes | |
while node_queue: | |
# Get last node | |
current_node = node_queue.pop() | |
# If the node has children, add it to the queue | |
if current_node.children: | |
node_queue.extendleft(current_node.children) | |
else: | |
leaf_nodes.append(current_node) | |
return leaf_nodes | |
def getTreeDepth(self): | |
""" | |
Get the depth of the tree | |
""" | |
# If no root, the depth is 0 | |
if not self.root: | |
return 0 | |
else: | |
# The depth is at least 1 | |
depth = 1 | |
current_node = self.root | |
# Loop until the node has no children | |
while current_node.children: | |
depth += 1 | |
current_node = current_node.children[0] | |
return depth | |
def calculatePathCost(self, path: List[Node]) -> float: | |
""" | |
Calculate the cost of a path | |
""" | |
return np.sum([node.delta for node in path]) | |
def getShortestPath(self) -> List[Node]: | |
""" | |
Get the shortest path | |
""" | |
if not self.root: | |
return list() | |
else: | |
shortest_path = min(self.getRoutesToLeaves(), key=lambda x: self.calculatePathCost(x)) | |
return shortest_path | |
def addPrependSymbol(self, prepend_symbol: complex) -> None: | |
""" | |
This is a helper function, which adds a symbol by setting it as the root if there is none or adding it to all leaf nodes | |
""" | |
if not self.root: | |
self.root = Node(0, prepend_symbol, received_symbol=prepend_symbol) | |
else: | |
leaf_nodes = self.getLeafNodes() | |
for leaf_node in leaf_nodes: | |
leaf_node.children = [Node(leaf_node.t + 1, prepend_symbol, received_symbol=prepend_symbol)] | |
def addAppendSymbol(self, append_symbol: complex) -> None: | |
""" | |
This is a helper function, which adds a symbol by setting it as the root if there is none or adding it to all leaf nodes | |
""" | |
if not self.root: | |
self.root = Node(0, append_symbol, received_symbol=append_symbol) | |
else: | |
leaf_nodes = self.getLeafNodes() | |
for leaf_node in leaf_nodes: | |
leaf_node.children = [Node(leaf_node.t + 1, append_symbol, received_symbol=append_symbol)] | |
def addReceivedSymbol(self, received_symbol): | |
""" | |
Add a received symbol to the tree | |
""" | |
# If no root exists, this is the start and is a known symbol | |
if not self.root: | |
self.root = Node(0, received_symbol, received_symbol=received_symbol) | |
return | |
else: | |
# If a root exists and the tree is deep enough, add a known symbol to every leaf | |
# Get the leaf nodes | |
leaf_nodes = self.getLeafNodes() | |
# Add one of every symbol in the map as a child | |
for leaf in leaf_nodes: | |
leaf.children = [Node(t = leaf.t + 1, symbol = _symbol, received_symbol=received_symbol) for (_symbol, _bits) in self.modulation_mapping] # type: ignore | |
if self.getTreeDepth() >= len(self.C): | |
# Calculate delta for every leaf | |
for path in self.getRoutesToLeaves(): | |
# Get the last node in the path, which is the leaf | |
leaf_node = path[-1] | |
# Calculate the path cost | |
_s = path[-len(self.C):] | |
sum_result = np.sum([np.power(np.abs(_s.symbol - _c),2) for _s, _c in zip(_s[::-1], self.C, strict = True)]) | |
# Update delta | |
leaf_node.delta = sum_result | |
# Check if tree is deep enough to calculate alpha and prune | |
if self.getTreeDepth() > len(self.C): | |
# Calculate the alpha for the path | |
# for path in self.getRoutesToLeaves(): | |
# path[-1].alpha = self.calculatePathCost(path) | |
# Get the parents of all the leaves as set and keep only smallest child | |
leaf_parents = set([path[-2] for path in self.getRoutesToLeaves()]) | |
# Delete all other children and only keep child with smallest alpha | |
for leaf_parent in leaf_parents: | |
for child in leaf_parent.children: | |
child.alpha = leaf_parent.alpha + child.delta | |
min_leaf = min(leaf_parent.children, key=lambda x: x.alpha) | |
max_leaf = max(leaf_parent.children, key=lambda x: x.alpha) | |
# if (min_leaf.alpha/max_leaf.alpha < 0.9): | |
leaf_parent.children = [min_leaf] | |
def printData(self): | |
if not self.root: | |
return | |
paths = self.getRoutesToLeaves() | |
for path in paths: | |
symbols = [f'{x.symbol: 3.3F}' for x in path] | |
output_string = ' -> '.join(symbols) | |
print(output_string) | |
MAP_BPSK = [ | |
[(-1 + 0j), [0]], | |
[(1 + 0j), [1]], | |
] | |
MAP_4QAM = [ | |
[( 1 + 1j)/np.sqrt(2), [0, 0]], | |
[(-1 + 1j)/np.sqrt(2), [0, 1]], | |
[(-1 - 1j)/np.sqrt(2), [1, 1]], | |
[( 1 - 1j)/np.sqrt(2), [1, 0]], | |
] | |
MAP_S = [ | |
[0, [0, 0]], | |
[1, [0, 1]], | |
[2, [1, 1]], | |
[3, [1, 0]], | |
] | |
def encoder(st): | |
_s = st[::-1] + [0]*(K-1) | |
c = deque() | |
for index in range (0, len(_s)-2): | |
c.extendleft([_s[index], _s[index] ^ _s[index+1], _s[index] ^ _s[index+2]][::-1]) | |
return list(c) | |
def assist_symbol_to_bit(symbol_sequence: SYMBOL_SEQUENCE_TYPE, bit_to_symbol_map: BIT_TO_SYMBOL_MAP_TYPE) -> BIT_SEQUENCE_TYPE: | |
""" | |
Returns a sequence of bits that corresponds to the provided sequence of symbols containing noise using the bit to symbol map that represent the modulation scheme and the euclidean distance | |
parameters: | |
symbol_sequence -> type <class 'list'> : List containing complex items (<class "complex">) which represents the symbols for example: [1+1j, 2+2j, -0.66-0.25j] | |
bit_to_symbol_map -> type <class 'list'> : A list containing lists. Each list entry contains a complex number and a bit sequence, representing the symbol, and the corresponding bit sequence that maps to it. | |
the maps that will be used are given as MAP_BPSK and MAP_4QAM | |
Example: | |
[ | |
[ 1+1j, [0, 0] ], | |
[ -1+1j, [0, 1] ], | |
[ -1-1j, [1, 1] ], | |
[ 1-1j, [1, 0] ] | |
] | |
returns: | |
bit_sequence -> type <class 'list'> : List containing int items which represents the bits for example: [0, 1, 1] | |
""" | |
def distance_to_symbols(symbol_to_test: complex) -> List[int]: | |
distances = tuple((float(np.linalg.norm(symbol_to_test - _value)), _bits) for (_value, _bits) in bit_to_symbol_map) # type: ignore | |
# Get min distance | |
min_distance = min(distances, key=lambda _distance: _distance[0]) | |
# Return bits | |
return min_distance[1] # type: ignore | |
# Determine nearest symbol in map for every item | |
detected_bits = [distance_to_symbols(_symbol) for _symbol in symbol_sequence] | |
return [element for sublist in detected_bits for element in sublist] | |
n = 1 | |
k = 3 | |
Rc = n/k | |
K = 3 | |
g = [4, 6, 5] | |
s = [1, 0, 1, 0, 1] + [0]*(K-1) | |
c = encoder(s) | |
s = [1, 0, 0, 0, 0] + [0]*(K-1) | |
bit_to_symbol_map = MAP_S | |
# Initialize MLSE | |
mlse = MLSE(bit_to_symbol_map, []) | |
for index in range (0, len(s)-2): | |
_s = s[index:index+3] | |
C = [c[3*index], c[3*index+1], c[3*index+2]] | |
mlse.C = C | |
mlse.addReceivedSymbol(s[index]) | |
shortest_path = mlse.getShortestPath() | |
result = [_node.symbol for _node in shortest_path] | |
print(result) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment