Created
January 7, 2022 22:25
-
-
Save Cheaterman/59fe126731291956fb98b63904ec664d to your computer and use it in GitHub Desktop.
vehicles.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
from dataclasses import dataclass, field | |
from typing import ClassVar, Dict, List, Optional, Tuple | |
from samp import ( | |
PLAYER_STATE_DRIVER, | |
PLAYER_STATE_ONFOOT, | |
AddVehicleComponent, | |
CallRemoteFunction, | |
ChangeVehicleColor, | |
ChangeVehiclePaintjob, | |
CreateVehicle, | |
DeletePVar, | |
DestroyVehicle, | |
GetPlayerMoney, | |
GetPlayerName, | |
GetPlayerPos, | |
GetPlayerVehicleID, | |
GetPVarInt, | |
GetVehicleModel, | |
GetVehicleParamsEx, | |
GetVehiclePos, | |
GetVehicleZAngle, | |
GivePlayerMoney, | |
IsPlayerInAnyVehicle, | |
KillTimer, | |
SendClientMessage, | |
SetTimer, | |
SetVehicleParamsEx, | |
) | |
from .cmdparser import cmd | |
from .elevators import distance | |
from .model import ( | |
Dealership as DBDealership, | |
Vehicle as DBVehicle, | |
session as sessionmaker, | |
) | |
from .streamer import ( | |
CreateDynamic3DTextLabel, | |
DestroyDynamic3DTextLabel, | |
) | |
from .tms import admin_level, is_logged_in, user_id | |
from .vehicle_data import vehicles | |
@dataclass | |
class Dealership: | |
name: str | |
x: float | |
y: float | |
z: float | |
label_id: int | |
id: Optional[int] = None | |
dealerships: ClassVar[List['Dealership']] = [] | |
@classmethod | |
def find_closest( | |
cls, | |
coords: Tuple[float, float, float], | |
radius: float = 50, | |
): | |
for dealership in cls.dealerships: | |
if distance(coords, (dealership.x, dealership.y)) < radius: | |
return dealership | |
@classmethod | |
def create( | |
cls, | |
name: str, | |
coords: Tuple[float, float, float], | |
id: Optional[int] = None, | |
): | |
x, y, z = coords | |
label_id = CreateDynamic3DTextLabel( | |
text=name, | |
color=0x80FF80FF, | |
x=x, | |
y=y, | |
z=z, | |
drawdistance=50.0, | |
testlos=True, | |
) | |
dealership = cls( | |
name=name, | |
x=x, | |
y=y, | |
z=z, | |
label_id=label_id, | |
id=id, | |
) | |
cls.dealerships.append(dealership) | |
return dealership | |
def destroy(self): | |
DestroyDynamic3DTextLabel(self.label_id) | |
self.dealerships.remove(self) | |
@dataclass | |
class Vehicle: | |
model_id: int | |
color_1: int | |
color_2: int | |
price: int | |
x: float | |
y: float | |
z: float | |
rotation: float | |
vehicle_id: Optional[int] = None | |
label_id: Optional[int] = None | |
paintjob: Optional[int] = None | |
mods: Dict[int, int] = field(default_factory=dict) | |
dealership: Optional[Dealership] = None | |
owner: Optional[str] = None | |
id: Optional[int] = None | |
_respawn_timer_id: Optional[int] = None | |
_antitheft_timer_id: Optional[int] = None | |
RESPAWN_DELAY: ClassVar[int] = 300 # Seconds | |
vehicles: ClassVar[Dict[int, 'Vehicle']] = {} | |
def __post_init__(self): | |
self._old_vehicle_id = None | |
@classmethod | |
def create( | |
cls, | |
model_id: int, | |
color_1: int, | |
color_2: int, | |
price: int, | |
x: float, | |
y: float, | |
z: float, | |
rotation: float, | |
paintjob: Optional[int] = None, | |
mods: Dict[int, int] = {}, | |
dealership: Optional[Dealership] = None, | |
owner: Optional[str] = None, | |
id: Optional[int] = None, | |
): | |
vehicle = cls( | |
model_id=model_id, | |
color_1=color_1, | |
color_2=color_2, | |
price=price, | |
x=x, | |
y=y, | |
z=z, | |
rotation=rotation, | |
paintjob=paintjob, | |
mods=mods, | |
dealership=dealership, | |
owner=owner, | |
id=id, | |
) | |
vehicle.create_vehicle() | |
cls.vehicles[vehicle.vehicle_id] = vehicle | |
return vehicle | |
def create_vehicle(self): | |
self.vehicle_id = vehicle_id = CreateVehicle( | |
self.model_id, | |
self.x, | |
self.y, | |
self.z, | |
self.rotation, | |
self.color_1, | |
self.color_2, | |
-1, | |
) | |
SetVehicleParamsEx( | |
vehicle_id, | |
False, | |
False, | |
False, | |
False, | |
False, | |
False, | |
False, | |
) | |
CallRemoteFunction('SetVehicleGas', vehicle_id, 100.0) | |
if self._old_vehicle_id is not None: | |
del self.vehicles[self._old_vehicle_id] | |
self.vehicles[vehicle_id] = self | |
self._old_vehicle_id = None | |
ChangeVehicleColor(vehicle_id, self.color_1, self.color_2) | |
if self.paintjob is not None: | |
ChangeVehiclePaintjob(vehicle_id, self.paintjob) | |
valid_components = vehicles[self.model_id].valid_components | |
for mod_id in self.mods.values(): | |
if( | |
mod_id is None | |
or mod_id not in valid_components | |
): | |
continue | |
AddVehicleComponent(vehicle_id, mod_id) | |
self.label_id = None | |
if not self.owner: | |
self.label_id = CreateDynamic3DTextLabel( | |
text=( | |
f'Buy for ${self.price}' | |
+ ( | |
f'\nat {self.dealership.name}' | |
if self.dealership | |
else '' | |
) | |
), | |
color=0x80FF80FF, | |
x=self.x, | |
y=self.y, | |
z=self.z, | |
drawdistance=50.0, | |
attachedvehicle=vehicle_id, | |
testlos=True, | |
) | |
def destroy(self): | |
del self.vehicles[ | |
self.vehicle_id | |
if self.vehicle_id is not None | |
else self._old_vehicle_id | |
] | |
self.destroy_vehicle() | |
self.stop_antitheft() | |
def destroy_vehicle(self): | |
self._kill_respawn_timer() | |
self.destroy_label() | |
if self.vehicle_id is not None: | |
DestroyVehicle(self.vehicle_id) | |
self.vehicle_id = None | |
def destroy_label(self): | |
if self.label_id is None: | |
return | |
DestroyDynamic3DTextLabel(self.label_id) | |
self.label_id = None | |
def respawn(self): | |
if self.vehicle_id is None: | |
return | |
self._old_vehicle_id = self.vehicle_id | |
self.destroy_vehicle() | |
self._start_respawn_timer() | |
def _start_respawn_timer(self): | |
self._respawn_timer_id = SetTimer( | |
self.create_vehicle, | |
self.RESPAWN_DELAY * 1000, | |
False | |
) | |
def _kill_respawn_timer(self): | |
if self._respawn_timer_id is None: | |
return | |
KillTimer(self._respawn_timer_id) | |
self._respawn_timer_id = None | |
def sell(self, playerid: int): | |
if self.owner: | |
return | |
self.dealership = None | |
self.owner = GetPlayerName(playerid) | |
self.destroy_label() | |
self.stop_antitheft() | |
GivePlayerMoney(playerid, -self.price) | |
SetVehicleParamsEx( | |
self.vehicle_id, | |
True, | |
False, | |
False, | |
False, | |
False, | |
False, | |
False, | |
) | |
def start_antitheft(self, playerid: int): | |
if self._antitheft_timer_id is not None: | |
return | |
self._antitheft_timer_id = SetTimer( | |
self._antitheft_check, | |
100, | |
True, | |
playerid, | |
) | |
def stop_antitheft(self): | |
if self._antitheft_timer_id is None: | |
return | |
if self.vehicle_id is not None: | |
SetVehicleParamsEx( | |
self.vehicle_id, | |
False, | |
False, | |
False, | |
False, | |
False, | |
False, | |
False, | |
) | |
KillTimer(self._antitheft_timer_id) | |
self._antitheft_timer_id = None | |
def _antitheft_check(self, playerid: int): | |
vehicle_id = self.vehicle_id | |
if( | |
vehicle_id is None | |
or GetPlayerVehicleID(playerid) != vehicle_id | |
): | |
self.stop_antitheft() | |
return | |
engine, *_ = GetVehicleParamsEx(vehicle_id) | |
if not engine: | |
return | |
SetVehicleParamsEx( | |
vehicle_id, | |
False, | |
False, | |
True, # Alarm | |
False, | |
False, | |
False, | |
False, | |
) | |
SendClientMessage( | |
playerid, | |
0xFF0000FF, | |
'ERROR: You do not own this vehicle. Use /buyvehicle to buy it.', | |
) | |
def OnGameModeInit(): | |
with sessionmaker() as session: | |
for db_dealership in session.query(DBDealership): | |
Dealership.create( | |
name=db_dealership.name, | |
coords=(db_dealership.x, db_dealership.y, db_dealership.z), | |
id=db_dealership.id, | |
) | |
dealerships_by_id = { | |
dealership.id: dealership | |
for dealership in Dealership.dealerships | |
} | |
for db_vehicle in session.query(DBVehicle): | |
Vehicle.create( | |
model_id=db_vehicle.model_id, | |
color_1=db_vehicle.color_1, | |
color_2=db_vehicle.color_2, | |
paintjob=db_vehicle.paintjob, | |
mods=db_vehicle.mods, | |
price=db_vehicle.price, | |
x=db_vehicle.x, | |
y=db_vehicle.y, | |
z=db_vehicle.z, | |
rotation=db_vehicle.rotation, | |
dealership=dealerships_by_id.get(db_vehicle.dealership_id), | |
owner=db_vehicle.owner and db_vehicle.owner.name, | |
id=db_vehicle.id, | |
) | |
def OnGameModeExit(): | |
for dealership in Dealership.dealerships[:]: | |
dealership.destroy() | |
for vehicle in Vehicle.vehicles.copy().values(): | |
vehicle.destroy() | |
def OnVehicleSpawn(vehicle_id): | |
vehicle_colors.pop(vehicle_id, None) | |
vehicle_paintjobs.pop(vehicle_id, None) | |
vehicle_mods.pop(vehicle_id, None) | |
vehicle = Vehicle.vehicles.get(vehicle_id) | |
if not vehicle: | |
return | |
vehicle.respawn() | |
return False | |
vehicle_colors = {} | |
def OnVehicleRespray(player_id, vehicle_id, color_1, color_2): | |
vehicle_colors[vehicle_id] = (color_1, color_2) | |
vehicle = Vehicle.vehicles.get(vehicle_id) | |
if not vehicle: | |
return | |
db_vehicle_id = vehicle.id | |
with sessionmaker() as session: | |
db_vehicle = session.query(DBVehicle).get(db_vehicle_id) | |
if db_vehicle: | |
db_vehicle.color_1 = color_1 | |
db_vehicle.color_2 = color_2 | |
session.commit() | |
vehicle_paintjobs = {} | |
def OnVehiclePaintjob(player_id, vehicle_id, paintjob_id): | |
vehicle_paintjobs[vehicle_id] = paintjob_id | |
vehicle = Vehicle.vehicles.get(vehicle_id) | |
if not vehicle: | |
return | |
db_vehicle_id = vehicle.id | |
with sessionmaker() as session: | |
db_vehicle = session.query(DBVehicle).get(db_vehicle_id) | |
if db_vehicle: | |
db_vehicle.paintjob = paintjob_id | |
session.commit() | |
vehicle_mods = {} | |
def OnVehicleMod(player_id, vehicle_id, component_id): | |
model_id = GetVehicleModel(vehicle_id) | |
vehicle = vehicles[model_id] | |
if component_id not in vehicle.valid_components: | |
return | |
component = vehicle.valid_components[component_id] | |
component_slot_id = component.type.id | |
vehicle_mods.setdefault(vehicle_id, {})[component_slot_id] = component.id | |
vehicle = Vehicle.vehicles.get(vehicle_id) | |
if not vehicle: | |
return | |
db_vehicle_id = vehicle.id | |
with sessionmaker() as session: | |
db_vehicle = session.query(DBVehicle).get(db_vehicle_id) | |
if db_vehicle: | |
setattr(db_vehicle, f'mod_{component_slot_id}', component_id) | |
session.commit() | |
def OnPlayerStateChange(playerid, new_state, old_state): | |
if not ( | |
old_state == PLAYER_STATE_ONFOOT | |
and new_state == PLAYER_STATE_DRIVER | |
): | |
return | |
vehicle = Vehicle.vehicles.get(GetPlayerVehicleID(playerid)) | |
if( | |
not vehicle | |
or vehicle.owner | |
): | |
return | |
vehicle.start_antitheft(playerid) | |
@cmd('adddealership', raw=True, requires=admin_level(5)) | |
def adddealership(playerid, name=''): | |
if not name: | |
SendClientMessage( | |
playerid, | |
0xFF0000FF, | |
'USAGE: /adddealership name', | |
) | |
return | |
x, y, z = GetPlayerPos(playerid) | |
dealership = Dealership.find_closest(GetPlayerPos(playerid), radius=100) | |
if dealership: | |
SendClientMessage( | |
playerid, | |
0xFF0000FF, | |
'ERROR: Already in range of an existing dealership.', | |
) | |
return | |
dealership = Dealership.create( | |
name=name, | |
coords=(x, y, z), | |
) | |
with sessionmaker() as session: | |
db_dealership = DBDealership( | |
name=name, | |
x=x, | |
y=y, | |
z=z, | |
) | |
session.add(db_dealership) | |
session.commit() | |
dealership.id = db_dealership.id | |
SendClientMessage( | |
playerid, | |
0x4080FFFF, | |
'Dealership was successfully added.', | |
) | |
@cmd('deldealership', requires=admin_level(5)) | |
def deldealership(playerid): | |
dealership = Dealership.find_closest(GetPlayerPos(playerid)) | |
if not dealership: | |
SendClientMessage( | |
playerid, | |
0xFF0000FF, | |
'ERROR: Not in range of any dealership.', | |
) | |
return | |
with sessionmaker() as session: | |
db_dealership = session.query(DBDealership).get(dealership.id) | |
if db_dealership: | |
session.delete(db_dealership) | |
session.commit() | |
dealership.destroy() | |
SendClientMessage( | |
playerid, | |
0x4080FFFF, | |
'Dealership was successfully removed.', | |
) | |
@cmd('addvehicle', requires=admin_level(4)) | |
def addvehicle(playerid, price_str=''): | |
if not price_str: | |
SendClientMessage( | |
playerid, | |
0xFF0000FF, | |
'USAGE: /addvehicle price', | |
) | |
return | |
price = 0 | |
try: | |
price = int(price_str) | |
except ValueError: | |
pass | |
if price <= 0: | |
SendClientMessage( | |
playerid, | |
0xFF0000FF, | |
'ERROR: Invalid price (must be positive integer).', | |
) | |
return | |
if not IsPlayerInAnyVehicle(playerid): | |
SendClientMessage( | |
playerid, | |
0xFF0000FF, | |
'ERROR: You must be in a vehicle.', | |
) | |
return | |
vehicle_id = GetPlayerVehicleID(playerid) | |
vehicle = Vehicle.vehicles.get(vehicle_id) | |
if vehicle: | |
message = ( | |
f'This vehicle already belongs to {vehicle.owner}.' | |
) if vehicle.owner else ( | |
f'This vehicle is already for sale for ${vehicle.price} ' | |
f'at {vehicle.dealership.name}.' | |
) | |
SendClientMessage( | |
playerid, | |
0xFF0000FF, | |
f'ERROR: {message}', | |
) | |
return | |
x, y, z = GetVehiclePos(vehicle_id) | |
dealership = Dealership.find_closest((x, y, z)) | |
if not dealership: | |
SendClientMessage( | |
playerid, | |
0xFF0000FF, | |
'ERROR: Not in range of any dealership.', | |
) | |
return | |
model_id = GetVehicleModel(vehicle_id) | |
color_1, color_2 = vehicle_colors.get( | |
vehicle_id, | |
(random.randrange(256), random.randrange(256)), | |
) | |
paintjob = vehicle_paintjobs.get(vehicle_id) | |
mods = vehicle_mods.get(vehicle_id, {}) | |
rotation = GetVehicleZAngle(vehicle_id) | |
DestroyVehicle(vehicle_id) | |
if GetPVarInt(playerid, 'plvehicle') == vehicle_id: | |
DeletePVar(playerid, 'plvehicle') | |
vehicle = Vehicle.create( | |
model_id=model_id, | |
color_1=color_1, | |
color_2=color_2, | |
paintjob=paintjob, | |
mods=mods, | |
price=price, | |
x=x, | |
y=y, | |
z=z, | |
rotation=rotation, | |
dealership=dealership, | |
) | |
with sessionmaker() as session: | |
db_vehicle = DBVehicle( | |
model_id=model_id, | |
color_1=color_1, | |
color_2=color_2, | |
paintjob=paintjob, | |
mods=mods, | |
price=price, | |
x=x, | |
y=y, | |
z=z, | |
rotation=rotation, | |
dealership_id=dealership.id, | |
) | |
session.add(db_vehicle) | |
session.commit() | |
vehicle.id = db_vehicle.id | |
SendClientMessage( | |
playerid, | |
0x4080FFFF, | |
'Vehicle was successfully added.', | |
) | |
@cmd('delvehicle', requires=admin_level(4)) | |
def delvehicle(playerid): | |
if not IsPlayerInAnyVehicle(playerid): | |
SendClientMessage( | |
playerid, | |
0xFF0000FF, | |
'ERROR: You must be in a vehicle.', | |
) | |
return | |
vehicle_id = GetPlayerVehicleID(playerid) | |
vehicle = Vehicle.vehicles.get(vehicle_id) | |
if not vehicle: | |
SendClientMessage( | |
playerid, | |
0xFF0000FF, | |
'ERROR: This vehicle is not for sale.', | |
) | |
return | |
with sessionmaker() as session: | |
db_vehicle = session.query(DBVehicle).get(vehicle.id) | |
session.delete(db_vehicle) | |
session.commit() | |
model_name = vehicles[vehicle.model_id].name | |
vehicle.destroy() | |
SendClientMessage( | |
playerid, | |
0x4080FFFF, | |
f'You successfully deleted this {model_name}.', | |
) | |
@cmd | |
def buyvehicle(playerid): | |
if not IsPlayerInAnyVehicle(playerid): | |
SendClientMessage( | |
playerid, | |
0xFF0000FF, | |
'ERROR: You must be in a vehicle.', | |
) | |
return | |
vehicle_id = GetPlayerVehicleID(playerid) | |
vehicle = Vehicle.vehicles.get(vehicle_id) | |
if not vehicle: | |
SendClientMessage( | |
playerid, | |
0xFF0000FF, | |
'ERROR: This vehicle is not for sale.', | |
) | |
return | |
if vehicle.owner: | |
SendClientMessage( | |
playerid, | |
0xFF0000FF, | |
f'ERROR: This vehicle is already owned by {vehicle.owner}.', | |
) | |
return | |
if GetPlayerMoney(playerid) < vehicle.price: | |
SendClientMessage( | |
playerid, | |
0xFF0000FF, | |
f'ERROR: You need at least ${vehicle.price} to buy this vehicle.', | |
) | |
return | |
if not is_logged_in(playerid): | |
SendClientMessage( | |
playerid, | |
0xFF0000FF, | |
'ERROR: You must log in to buy a vehicle.', | |
) | |
return | |
with sessionmaker() as session: | |
db_vehicle = session.query(DBVehicle).get(vehicle.id) | |
db_vehicle.dealership_id = None | |
db_vehicle.owner_id = user_id(playerid) | |
session.commit() | |
vehicle.sell(playerid) | |
model_name = vehicles[vehicle.model_id].name | |
SendClientMessage( | |
playerid, | |
0x4080FFFF, | |
f'You successfully bought this {model_name} for ${vehicle.price}.', | |
) | |
@cmd | |
def savevehicle(playerid): | |
if not IsPlayerInAnyVehicle(playerid): | |
SendClientMessage( | |
playerid, | |
0xFF0000FF, | |
'ERROR: You must be in a vehicle.', | |
) | |
return | |
vehicle_id = GetPlayerVehicleID(playerid) | |
vehicle = Vehicle.vehicles.get(vehicle_id) | |
if not vehicle: | |
SendClientMessage( | |
playerid, | |
0xFF0000FF, | |
'ERROR: This vehicle is not owned.', | |
) | |
return | |
if vehicle.owner != GetPlayerName(playerid): | |
message = ( | |
'' if not vehicle.owner | |
else f': it is owned by {vehicle.owner}' | |
) | |
SendClientMessage( | |
playerid, | |
0xFF0000FF, | |
f'ERROR: This vehicle not yours{message}.', | |
) | |
return | |
if not is_logged_in(playerid): | |
SendClientMessage( | |
playerid, | |
0xFF0000FF, | |
'ERROR: You must log in to save a vehicle.', | |
) | |
return | |
x, y, z = GetVehiclePos(vehicle_id) | |
rotation = GetVehicleZAngle(vehicle_id) | |
with sessionmaker() as session: | |
db_vehicle = session.query(DBVehicle).get(vehicle.id) | |
db_vehicle.x = x | |
db_vehicle.y = y | |
db_vehicle.z = z | |
db_vehicle.rotation = rotation | |
session.commit() | |
model_name = vehicles[vehicle.model_id].name | |
SendClientMessage( | |
playerid, | |
0x4080FFFF, | |
f'Your {model_name} position and rotation were successfully saved.', | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment