Created
March 18, 2016 22:27
-
-
Save whutch/c8853ec9488c24b18b10 to your computer and use it in GitHub Desktop.
Game module for Clockwork that demonstrates terminal manipulation possibilities.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""Weather monitor; example of client terminal manipulation.""" | |
# Part of Clockwork MUD Server (https://github.com/whutch/cwmud) | |
# :copyright: (c) 2008 - 2016 Will Hutcheson | |
# :license: MIT (https://github.com/whutch/cwmud/blob/master/LICENSE.txt) | |
from ... import settings | |
from ...contrib.weather.patterns import WeatherPattern | |
from ...contrib.worldgen.maps import _render_map_data, render_map_from_layers | |
from ...core.events import EVENTS | |
from ...core.sessions import Session, SESSIONS | |
from ...core.timing import TIMERS | |
from ...core.utils.decorators import patch | |
from ...core.utils.funcs import joins | |
from ...libs.miniboa import NAWS | |
settings.IDLE_TIME = 0 | |
settings.IDLE_TIME_MAX = 0 | |
@patch(Session) | |
def _parse_input(self, data): | |
if not data: | |
return | |
# TODO: do something with it? | |
# TODO: process ^C as quit | |
@patch(Session) | |
def _send(self, data): | |
self._client.send_cc(data) | |
@patch(Session) | |
def send(self, data, *more, sep="", end=""): | |
"""Send text to the client tied to this session. | |
The resulting output will not be sent immediately, but will be put | |
in a queue to be sent during its next poll. | |
`data` and all members of `more` will be converted to strings | |
and joined together by `sep` via the joins function. | |
:param any data: An initial chunk of data | |
:param any more: Optional, any additional data to send | |
:param str sep: Optional, a separator to join the resulting output by | |
:param str end: Optional, a terminator appended to the resulting output | |
:returns None: | |
""" | |
self._output_queue.append(joins(data, *more, sep=sep) + end) | |
@patch(Session) | |
def poll(self, output_only=False): | |
"""Check the status of this session and process any queued IO. | |
:param bool output_only: Whether to only process the output queue | |
(this allows you to call poll from inside a | |
command without triggering an infinite loop) | |
:returns None: | |
""" | |
if self._client.active: | |
if not output_only: | |
# Process input through the command queue. | |
if self._client.cmd_ready and self.active: | |
data = self._client.get_command() | |
if data is not None: | |
self._parse_input(data) | |
# Process output from the output queue. | |
if self._output_queue and self._client: | |
output = "".join(self._output_queue) | |
self._send(output) | |
self._output_queue.clear() | |
# We don't need the default connect menu. | |
EVENTS.unhook("client_connected") | |
@EVENTS.hook("client_connected") | |
def _hook_client_connected(client): | |
session = SESSIONS.create(client) | |
session.color = True | |
session.client.request_do_sga() | |
session.client.request_naws() | |
session.client.request_terminal_type() | |
session.client.request_linemode_off() | |
session.client.request_will_echo() | |
RAIN_SEED = TIMERS.time % 10000 | |
WIND_SEED = RAIN_SEED / 2 | |
RAIN_PATTERN = WeatherPattern(time_source=lambda: TIMERS.time, | |
time_scale=2, formation_speed=0.25, | |
storm_scale=50, wind_scale=20, | |
seed=RAIN_SEED) | |
WIND_PATTERN = WeatherPattern(time_source=lambda: TIMERS.time, | |
time_scale=2, formation_speed=0.35, | |
storm_scale=100, wind_scale=20, | |
seed=WIND_SEED) | |
MAP_SIZES = set() | |
BASE_MAP_TILES = {} | |
MAP_STRINGS = {} | |
def get_weather_tile(rain_value, wind_value): | |
"""Get the ASCII tile for the given weather data. | |
:param float rain_value: The amount of rain, from -1 to 1 | |
:param float wind_value: The amount of wind, from -1 to 1 | |
:returns str: The ASCII tile | |
""" | |
if -0.03 <= wind_value <= 0.03: | |
if rain_value >= 0.7: | |
return "^YH" | |
else: | |
return "^wW" | |
else: | |
if rain_value >= 0.6: | |
return "^CL" | |
if rain_value >= 0.4: | |
return "^cR" | |
if rain_value >= 0.3: | |
return "^bR" | |
return None | |
def generate_base_map(width=80, height=24, center=(0, 0)): | |
"""Generate the base map tiles for the given window size. | |
:param int width: The width of the map | |
:param int height: The height of the map | |
:param tuple(int, int) center: The center coordinate of the map | |
:returns None: | |
""" | |
layers = _render_map_data(width, height, center=center) | |
tiles = render_map_from_layers(*layers, convert_color=False, | |
join_tiles=False) | |
BASE_MAP_TILES[(width, height, center)] = tiles | |
def update_weather_data(): | |
"""Update the current weather data for all window sizes.""" | |
MAP_SIZES.clear() | |
for session in SESSIONS.all(): | |
if session.client.telnet_opt_dict[NAWS].reply_pending: | |
continue | |
width, height = session.client.columns - 25, session.client.rows - 2 | |
center = (0, 0) # TODO: option sent from client? | |
MAP_SIZES.add((width, height, center)) | |
RAIN_PATTERN.update() | |
WIND_PATTERN.update() | |
for (width, height, center) in MAP_SIZES: | |
if (width, height, center) not in BASE_MAP_TILES: | |
generate_base_map(width, height, center) | |
rain_data = RAIN_PATTERN.generate_layer(width, height, center=center, | |
octaves=2, fine_noise=0.1) | |
wind_data = WIND_PATTERN.generate_layer(width, height, center=center, | |
octaves=2, fine_noise=0.05) | |
# TODO: precolor strings for efficiency? | |
rows = [] | |
sidebar = ("^MSeed: {:>#9.8}\n^YTime: {:>#6.5g}\n\n" | |
"^BRain Storms:\n" | |
"^GXY: {:=+#8g} {:=+#8g}\n^CMovement: {} @ {:#.4}^~\n\n" | |
"^BWind Storms:\n" | |
"^GXY: {:=+#8g} {:=+#8g}\n^CMovement: {} @ {:#.4}^~\n\n" | |
"^wStorm Legend:\n" | |
"^KLight ^bR^Kain\n^KHeavy ^cR^Kain\n" | |
"^CL^Kightning\n^KStrong ^wW^Kind\n" | |
"^YH^Kurricane Winds!^~" | |
.format(RAIN_PATTERN.seed, | |
round(RAIN_PATTERN.time_offset, 3), | |
round(RAIN_PATTERN._offset_x[1], 3), | |
round(RAIN_PATTERN._offset_y[1], 3), | |
RAIN_PATTERN.get_wind_direction(), | |
round(RAIN_PATTERN.get_wind_speed(), 3), | |
round(WIND_PATTERN._offset_x[1], 3), | |
round(WIND_PATTERN._offset_y[1], 3), | |
WIND_PATTERN.get_wind_direction(), | |
round(WIND_PATTERN.get_wind_speed(), 3)) | |
.split("\n")) | |
for y in range(len(rain_data)): | |
tiles = [] | |
for x in range(len(rain_data[y])): | |
tile = get_weather_tile(rain_data[y][x], wind_data[y][x]) | |
if tile is None: | |
tile = BASE_MAP_TILES[(width, height, center)][y][x] | |
tiles.append(tile) | |
tiles.append("^~") | |
row = "".join(tiles) | |
if y < len(sidebar): | |
row = row + " " + sidebar[y] | |
rows.append(row) | |
MAP_STRINGS[(width, height, center)] = rows | |
def send_weather_frame_smart(session): | |
"""Send the current weather frame to the given session. | |
:param Session session: The session to send the weather frame to | |
:returns None: | |
""" | |
# TODO: transition messages, "Please wait.." etc | |
if session.client.telnet_opt_dict[NAWS].reply_pending: | |
# We're not done negotiating their screen size. | |
return | |
width, height = session.client.columns - 25, session.client.rows - 2 | |
if "ready" not in session.flags: | |
# Setup their window. | |
session.send(chr(27), "[2J") # Clear their screen. | |
session.send(chr(27), "[1;1H") | |
session.send("AccuWeather 5000 Storm Tracking System") | |
session.send(chr(27), "[2;{}r".format(height + 1)) | |
session.send(chr(27), "[{};1H".format(height + 2)) | |
#session.send("Command: ") | |
session.flags.add("ready") | |
if (width, height, (0, 0)) not in MAP_SIZES: | |
return | |
session.send(chr(27), "7") # Save cursor position. | |
for line, row in enumerate(MAP_STRINGS[(width, height, (0, 0))], 2): | |
# TODO: keep set of last frame and only send delta | |
session.send(chr(27), "[{};1H".format(line), row) | |
session.send(chr(27), "8") # Restore cursor position. | |
def send_weather_frame_dumb(session): | |
"""Send the current weather frame to the given session in a dumb way. | |
:param Session session: The session to send the weather frame to | |
:returns None: | |
""" | |
if session.client.telnet_opt_dict[NAWS].reply_pending: | |
# We're not done negotiating their screen size. | |
return | |
width, height = session.client.columns - 25, session.client.rows - 2 | |
if (width, height, (0, 0)) not in MAP_SIZES: | |
return | |
session.send("\nAccuWeather 5000 Storm Tracking System\n") | |
for row in MAP_STRINGS[(width, height, (0, 0))]: | |
session.send(row, "\n") | |
@TIMERS.create("10 pulses", repeat=-1) | |
def _update_frame(): | |
update_weather_data() | |
for session in SESSIONS.all(): | |
if session.client.terminal_type.lower() in ("blowtorch",): | |
send_weather_frame_dumb(session) | |
else: | |
send_weather_frame_smart(session) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment