Created
March 31, 2024 22:11
-
-
Save MatthewWilkes/e817df454fd814d0568901d072ece100 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
import tidal | |
import utime | |
tidal.enable_peripheral_I2C() | |
from tidal import i2c, i2cp | |
class RAMBlockDev: | |
def __init__(self, block_size, num_blocks): | |
self.block_size = block_size | |
self.data = bytearray(block_size * num_blocks) | |
def readblocks(self, block_num, buf, offset=0): | |
addr = block_num * self.block_size + offset | |
for i in range(len(buf)): | |
buf[i] = self.data[addr + i] | |
def writeblocks(self, block_num, buf, offset=None): | |
if offset is None: | |
# do erase, then write | |
for i in range(len(buf) // self.block_size): | |
self.ioctl(6, block_num + i) | |
offset = 0 | |
addr = block_num * self.block_size + offset | |
for i in range(len(buf)): | |
self.data[addr + i] = buf[i] | |
def ioctl(self, op, arg): | |
if op == 4: # block count | |
return len(self.data) // self.block_size | |
if op == 5: # block size | |
return self.block_size | |
if op == 6: # block erase | |
return 0 | |
class EEPROM: | |
INTERNAL_BLOCK_SIZE = 32 | |
BLOCK_SIZE = 128 | |
CAPACITY = 4096 | |
def __init__(self, bus, address): | |
self.bus = bus | |
self.address = address | |
def block_address(self, block_origin, block_offset, offset): | |
address = ((block_origin + block_offset) * self.BLOCK_SIZE) + offset | |
return bytes(((address >> 8) & 0xFF, (address >> 0) & 0xFF)) | |
def readblocks(self, block_num, buf, offset=0): | |
print(f"readblocks({block_num}, buf<{len(buf)}>, {offset})") | |
address = self.block_address(block_num, 0, offset) | |
self.retrying_read(address, buf) | |
def retrying_read(self, address, buf): | |
while True: | |
try: | |
self.bus.writeto(self.address, address) | |
self.bus.readfrom_into(self.address, buf) | |
except OSError: | |
utime.sleep(0.1) | |
else: | |
return | |
def retrying_write(self, data): | |
utime.sleep(1) | |
while True: | |
try: | |
self.bus.writeto(self.address, data) | |
except OSError: | |
utime.sleep(0.1) | |
else: | |
return | |
def writeblocks(self, block_num, buf, offset=0): | |
for i in range(len(buf)): | |
address = self.block_address(block_num, 0, offset + i) | |
self.bus.writeto(self.address, address + buf[i:i+1]) | |
utime.sleep(0.005) | |
"""print(f"writeblocks({block_num}, {buf}, {offset})") | |
internal_blocks_to_write = math.ceil(len(buf) / self.INTERNAL_BLOCK_SIZE) | |
print(f"Writing {internal_blocks_to_write} {self.INTERNAL_BLOCK_SIZE} byte blocks") | |
if offset is None: | |
clear = bytearray(self.INTERNAL_BLOCK_SIZE, 0) | |
for internal_offset in range(internal_blocks_to_write): | |
address = self.block_address(block_num, 0, internal_offset*self.INTERNAL_BLOCK_SIZE) | |
print(f"Clearing {self.INTERNAL_BLOCK_SIZE} bytes at {tuple(address)} with {address + clear}") | |
self.retrying_write(address + clear) | |
print("Okay") | |
del clear | |
for internal_offset in range(internal_blocks_to_write): | |
address = self.block_address(block_num, 0, internal_offset*self.INTERNAL_BLOCK_SIZE) | |
data = address + buf[internal_offset * self.INTERNAL_BLOCK_SIZE:(1+internal_offset) * self.INTERNAL_BLOCK_SIZE] | |
print(f"Writing {data} to {tuple(address)}") | |
self.retrying_write(data) | |
else: | |
for internal_offset in range(internal_blocks_to_write): | |
address = self.block_address(block_num, 0, internal_offset*self.INTERNAL_BLOCK_SIZE) | |
data = buf[internal_offset * self.INTERNAL_BLOCK_SIZE:(1+internal_offset) * self.INTERNAL_BLOCK_SIZE] | |
print(f"Writing {data} to {tuple(address)}") | |
self.retrying_write(address + data) | |
utime.sleep(0.1) | |
address = self.block_address(block_num, 0, offset) | |
self.bus.writeto(self.address, address + buf)""" | |
def ioctl(self, op, arg): | |
print(f"ioctl({op}, {arg})") | |
if op == 4: # get number of blocks | |
return self.CAPACITY // self.BLOCK_SIZE | |
if op == 5: # get block size | |
return self.BLOCK_SIZE | |
if op == 6: # Erase block | |
self.writeblocks(arg, bytearray(self.BLOCK_SIZE)) | |
return 0 | |
eeproms = [EEPROM(i2cp, addr) for addr in i2cp.scan()] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment