Эволюция игрового серверного фреймворка на Python. Часть 1 из 2. Слои инфраструктуры

Kate

Administrator
Команда форума
Допустим, у нас большие планы, и мы хотим реализовать серверную часть для всех основных игровых жанров. Однако, прежде, чем приступить к этому, нужно хорошенько подготовиться. Нужно создать такую основу, которая бы подходила для каждой игры, чтобы потом не нужно было ничего переделывать на пол-пути. В том числе и все сделанные на тот момент игры.

Главная наша задача — совместить несовместимое, выработать такое решение, которое бы позволяло создавать игровые приложения любой сложности качественно и максимально быстро! Сочетание этих несовместимых, казалось бы, условий обеспечит нам фреймворк, эволюцию которого мы и намерены здесь проследить. В первой статье будет описано создание инфраструктурного фреймворка, а во второй — разработка логики на его основе. Всего — две статьи на описание методологии разработки всей серверной части.

В качестве языка программирования выберем Python за его простоту и элегантность. Мы начнем в сокетов (asyncio), а закончим HTTP-сервером. Наша задача состоит в том, чтобы код логики не зависел от типа сервера и задействованных сетевых протоколов.

Исходная точка​

При разборе клиента мы проследили, как из отображения выделяется логика — сначала модель, а потом и контроллер. Потом мы все игровые действия вместо вызовов методов сделали простыми объектами-командами. Так как отображение и логика взаимодействуют исключительно такими командами, то отображению становится все равно, где находится логика — в том же приложении или в другом, запущенном на иной машине, расположенной за тысячи километров. На клиенте контроллер без логики, таким образом, превращается в простой шлюз между отображением и логикой, между клиентом и сервером.

На сервере, прежде чем команды дойдут до логики, мы должны установить соединение с клиентом и принять от него сообщения. После этого мы декодируем их в объекты команд и передаем их соответствующей функции бизнес-логики на обработку. Таким образом, вокруг логики выстраивается целый массив инфраструктуры, которую нам предстоит упорядочить. Чтобы разобраться в ней самым внимательным образом, начнем с самого простого примера сокет-сервера.

Про то, как реализованы сокеты в Python, мы писали в другом месте. Тут мы продолжим с того, на чем закончили там — с TCP-сервера на asyncio. Добавим в него кодирование и обработку сообщений, а также возможность отправлять их другим клиентам. В результате чего наша первая версия программы будет выглядеть так:

last_index = 0
writers = []

async def handle_connection(reader, writer):
global last_index
global writers
writers.append(writer)
last_index += 1
index = last_index
print("+Connected")
unparsed_bytes = b""
while True:
# Receive
try:
request_bytes = await reader.read(1024)
except ConnectionError:
break
if reader.at_eof():
break # Disconnected by client
request_bytes = unparsed_bytes + request_bytes
request_bytes_list = request_bytes.split(b"\x00")
unparsed_bytes = request_bytes_list.pop()

# Process
for request_bytes in request_bytes_list:
if not request_bytes:
continue
request = request_bytes.decode("utf8")
print(" >> Received: {repr(request)}")
try:
command = json.loads(request)
to_self_command, to_all_command = await handle_command(index, command)
except Exception as e:
print(f"[SERVER#{index}] Error while parsing or processing: {e}")
to_self_command, to_all_command = {"error": str(e)}, None
self_response = json.dumps(to_self_command) if to_self_command else None
all_response = json.dumps(to_all_command) if to_all_command else None
print(f" << Send: {repr(self_response)} as self_response and commands: "
f"{repr(all_response)} to all {len(writers)} connections")
if self_response:
to_self_bytes = self_response.encode("utf8") + b"\x00"
try:
writer.write(to_self_bytes)
await writer.drain()
except ConnectionError:
pass # Yet must send to others
if all_response:
to_all_bytes = all_response.encode("utf8") + b"\x00"
for w in writers:
try:
w.write(to_all_bytes)
except ConnectionError:
continue
await asyncio.gather(w.drain() for w in self.writers)
writers.remove(writer)
writer.close()
print(f"-Disconnected")

async def main(host, port):
print(f"Start server: {host}:{port}")
server = await asyncio.start_server(handle_connection, host, port)
async with server:
await server.serve_forever()

HOST, PORT = "", 5554
if __name__ == "__main__":
asyncio.run(main(HOST, PORT))
Это был код, отвечающий за пересылку и первичную обработку сообщений. Бизнес-логика реализована в handle_command(). В ней реализовано всего три команды: взять, поместить и изменить (get, set, update). Так как в логике могут присутствовать запросы к базам данным или внешним сервисам, то, чтобы не блокировать выполнение программы на это время, данная функция также сделана асинхронной (async):

storage = {}

async def handle_command(index, command):
global storage
key = command.get("key")
code = command.get("code")
if code == "get":
state = storage.get(key)
return {"success": True, **command, "state": state}, None
elif code == "set":
state = command.get("state")
storage[key] = state
return {"success": True, **command}, None
elif code == "update":
index = command.get("index")
value = command.get("value")
if not isinstance(index, int) or not isinstance(value, int):
return {"success": False, **command}, None
state = storage.get(key)
if state is None:
storage[key] = state = []
if index >= len(state):
state += [0] * (index - len(state) + 1)
state[index] = value
return None, {"success": True, **command}
return None, None
Пока что все соединения сохраняются в простой массив writers, а в обработчике команд (handle_command()) мы различаем только два типа адресатов для ответных команд: назад к себе (to_self_command), и ко всем (to_all_command). Это самый примитивный подход и в одном из следующих примеров мы заменим его на перечисление индивидуальных адресатов по их индексам. Обработчик команд будет возвращать массив кортежей, каждый из которых в первом элементе будет содержать список индексов соединений, а во втором список команд, которые им предназначаются: result = [(indexes, commands), (indexes2, commands2)]. Соответственно, все соединения должны будут сохраняться не в списке writers, как раньше, а в словаре writer_by_index. Но пока вернемся к архитектуре приложения.

Разделение логики и инфраструктуры​

Сейчас наше приложение цельное и неразделимое, а потому максимально "нереюзабельное". Другими словами, мы не можем использовать отдельные его части повторно, так как все они жестко связаны друг с другом. Хоть логика и вынесена в отдельную функцию handle_command(), но в handle_connection() вместо нее нельзя подставить другую. Если мы захотим написать новое приложение с другой логикой, то нам придется копировать и handle_connection(). Конечно, нас это не устраивает.

В качестве быстрого решения можно передавать ссылку на функцию с логикой (handle_command()) в параметрах handle_connection(). Или можно вынести обе функции в класс, а в подклассах их переопределять. Но лучше всего логику (обработку команд) и инфраструктуру (обмен командами по сети) вообще реализовать в двух разных классах.

Преимущество классов перед отдельными функциями в том, что в них можно хранить собственный контекст из переменных-членов (атрибутов), а также группировать и переопределять в подклассах методы.

Разделение приложения на инфраструктуру (Server) и логику (Logic)


Разделение приложения на инфраструктуру (Server) и логику (Logic)
Разделение на два класса задает два магистральных пути развития в разработке сервера. С одной стороны мы можем подготовить разные типы и версии функционала по передаче объектов (Server), с другой — наделать кучу разных игр (Logic). И все это независимо друг от друга. Единственное условие, которое должно соблюдаться, это чтобы класс логики содержал метод с такой сигнатурой: handle_command(self, index, command):

class SocketServer:
def __init__(self, logic, host, port):
self.logic = logic
self.host = host
self.port = port
self.writers = []

def run(self):
asyncio.run(self.main())

async def main(self):
print(f"Start server: {self.host}:{self.port}")
server = await asyncio.start_server(handle_connection, host, port)
async with server:
await server.serve_forever()

async def handle_connection(self, reader, writer):
...
# Call:
# to_self_command, to_all_command = await self.logic.handle_command(index, command)
# instead of:
# to_self_command, to_all_command = await handle_command(index, command)

class MyLogic:
# global storage -> self.storage
def __init__(self):
self.storage = {}

async def handle_command(self, index, command):
...
Код запуска при этом немного изменится:

HOST, PORT = "", 5000
if __name__ == "__main__":
server = SocketServer(MyLogic(), HOST, PORT)
server.run()
Таким образом, сервер получает определенный класс логики и обращается к нему согласно интерфейсу. Связь эта односторонняя, так как логика ничего не знает об инфраструктуре.

Это был только первый шаг. Следующим будет — отделение функционала по транспортировке сообщений (сокеты) от способа их кодирования (JSON, YML, XML).

Разделение форматирования и передачи сообщений​

Отделить логику приложения от инфраструктуры было хорошей идеей. В результате логика просто преобразуют одни команды в другие, попутно меняя свое состояние. При этом она не задумывается, откуда они берутся и куда деваются. За все это отвечает класс сервера (SocketServer). Вот сервером сейчас и займемся. Посмотрим, на какие части, в свою очередь, распадается он сам.

Заглянув внутрь него, мы увидим, что определенный способ обработки сообщений (JSON) там жестко завязан на определенный способ их передачи (TCP-сокеты). И если мы захотим использовать другой формат передачи данных, то нам заодно придется скопировать кучу кода, к формату не относящегося.

Первое решение, которое приходит на ум — реализовать данные функции в разных методах (выделить методы parse() и serialize()):

class SocketServer:
def __init__(self, logic, host, port) -> None:
self.logic = logic
self.host = host
self.port = port
self.last_index = 0
self.writer_by_index = {}

async def handle_connection(self, reader, writer):
self.last_index += 1
index = self.last_index
self.writer_by_index[index] = writer
print(f"[SERVER#{index}] +Connected")
unparsed_bytes = b""
while True:
# Receive
try:
request_bytes = await reader.read(1024)
except ConnectionError:
break
if reader.at_eof():
print(f"[SERVER#{index}] EOF. Connection closed")
break
request_bytes = unparsed_bytes + request_bytes
# Handle
result, unparsed_bytes = await self.handle_bytes(index, request_bytes)
# Send response
await self.send(result)
print(f"[SERVER#{index}] -Disconnected")
del self.writer_by_index[index]
writer.close()

async def handle_bytes(self, index, request_bytes):
# Decode request
request, unparsed_bytes = self.parse(request_bytes)
# Make response
try:
# Parse request
command = json.loads(request)
# Process request
result = self.logic.handle_command(index, command)
except Exception as e:
print(f"[SERVER#{index}] Error while parsing or processing: {request} {traceback.format_exc()}")
result = [([index], [{"error": str(e)}])]
return result, unparsed_bytes

async def send(self, result):
if not result:
return
# Serialize
result = [(indexes, self.serialize(commands))
for indexes, commands in result]
# Send
wait_writers = []
for indexes, response_bytes in result:
for i in indexes:
writer = self.writer_by_index.get(i)
if writer:
try:
writer.write(response_bytes)
wait_writers.append(writer)
except ConnectionError:
continue
await asyncio.gather(writer.drain() for writer in wait_writers)

def parse(self, data_bytes):
request_bytes, unparsed_bytes = request_bytes.split(b"\x00")
request = request_bytes.decode("utf8")
return request, unparsed_bytes

def serialize(self, data):
return json.dumps(commands).encode("utf8") + b"\x00"
Теперь, например, если мы захотим изменить JSON на YML, то нам нужно всего лишь наследоваться от SocketServer и переопределить parse() и serialize(). Но при таком подходе остается все тот же недостаток при комбинировании разных функционалов. Для каждой комбинации придется создавать отдельный класс: JSONTCPSocketServer, YMLUDPSocketServer, JSONHTTPServer и так далее. Рассмотрим данный вопрос подробнее.

Код по доставке (сокеты) и код по форматированию (JSON) остается в одном классе. Это значит, что если нужно создать классы для двух типов сокетов (TCP и UDP) и трех видов форматов (JSON, YML, XML), то в итоге мы получим 2 * 3 = 6 классов для всех возможных комбинаций. Хотя должно быть по идее 2 + 3 = 5. Пусть 6 и 5 отличаются не сильно, но иметь в качестве закона возрастания кода умножение вместо сложения дает уже на следующем этапе избыточность в 33 % (3 * 3 = 9, 3 + 3 = 6). И то, что код при этом не дублируется — заслуга Python'а (множественное наследование), а не наша.

Выделение парсера как отдельного инфраструктурного слоя


Выделение парсера как отдельного инфраструктурного слоя
Поэтому лучше поступить по-нормальному и разнести доставку и первичную обработку сообщений по разным классам. В данном случае вынести из класса сервера парсер:

class SocketServer:
def __init__(self, parser, logic, host, port) -> None:
self.parser = parser
self.logic = logic
self.host = host
self.port = port
self.last_index = 0
self.writer_by_index = {}
# ...
async def handle_bytes(self, index, request_bytes):
try:
# Parse
commands, unparsed_bytes = self.parser.parse(request_bytes)
# Handle
result = await self.logic.handle_commands(index, commands)
except Exception as e:
result = [([index], [{"error": str(e)}])]
unparsed_bytes = b""
return result, unparsed_bytes

async def send(self, result):
if not result:
return
# Serialize
result = [(indexes, self.parser.serialize(commands))
for indexes, commands in result]
# Send
wait_writers = []
for indexes, response_bytes in result:
for i in indexes:
writer = self.writer_by_index.get(i)
if writer:
try:
writer.write(response_bytes)
wait_writers.append(writer)
except ConnectionError:
continue
await asyncio.gather(writer.drain() for writer in wait_writers)

class Parser:
def parse(self, data_bytes):
return data_bytes, b""

def serialize(self, data):
return data

class JSONParser(Parser):
def parse(self, data_bytes):
# Get unparsed_bytes
data_bytes, unparsed_bytes = data_bytes.rsplit(b"\x00", 1)
# bytes -> list of str
data_str = data_bytes.decode("utf8")
message_list = data_str.split("\x00")
# Parse JSON commands (suppose, a command cannot be a list)
result = []
for message in message_list:
if not message:
continue
commands = json.loads(message)
if not commands:
continue
if isinstance(commands, list):
result.extend(commands)
else:
result.append(commands)
return result, unparsed_bytes

def serialize(self, data):
if not data:
return b""
data_str = json.dumps(data)
data_bytes = data_str.encode("utf8") + b"\x00"
return data_bytes

class MyLogic:
async def handle_commands(self, index, commands):
# Custom logic
result = []
for command in commands:
key = command.get("key")
code = command.get("code")
...
return result

HOST, PORT = "", 5000
if __name__ == "__main__":
server = SocketServer(JSONParser(), MyLogic(), HOST, PORT)
server.run()
Остановимся на кое-каких технических деталях.

Сервер может за раз принять одно сообщение, а может принять и несколько — в зависимости от того, сколько в потоке байтов присутствует разделителей. Сколько было в буфере, столько и возвращает. Да и в каждом сообщении, в принципе, можно отправлять сразу несколько команд вместо одной. Поэтому логично в парсере возвращать сразу список команд и условиться, что всегда возвращаться будет только список. Соответственно, и handle_command() будет принимать и возвращать команды только списками. Потому он и переименован теперь в handle_commands().

То, что мы передаем в парсер байты, а не декодированные в UTF-8 строки, позволяет нам реализовывать в них собственные кастомные бинарные протоколы. Разграничение сообщений нулевым символом (b"\x00") помещено в парсер по этой же причине. Если удалось распарсить команду, она возвращается. Если нет — возвращаются байты, чтобы позже к ним добавить новые и повторить попытку.

Как вы могли заметить, мы перешли от массовой адресации сообщений к точечной, о которой уже говорили выше. То есть вместо отсылки данных всем соединениям в списке writers, мы можем теперь отправлять команды по конкретным индексам соединений из словаря writer_by_index. Для этого handle_commands() к каждому списку команд добавляет еще список индексов соединений, которым они предназначены.

Чтобы по-прежнему можно было делать массовую отправку, нужно сохранять все индексы в хранилище при подключении, а при отключении — удалять их из списка. Поэтому в класс логики добавляется еще пара методов: on_connect() и on_disconnect():

class SocketServer:
# ...
async def handle_connection(self, reader, writer):
...
result = []
await self.logic.on_connect(index, result)
await self.send(result)
unparsed_bytes = b""
while True:
...
result = []
await self.logic.on_disconnect(index, result)
await self.send(result)
del self.writer_by_index[index]
writer.close()

class MyLogic:
def __init__(self):
self.storage = {}

async def on_connect(self, index, result):
indexes = self.storage.get("indexes")
if indexes is None:
self.storage["indexes"] = [index]
else:
indexes.append(index)

async def on_disconnect(self, index, result):
indexes = self.storage.get("indexes")
if indexes and index in indexes:
indexes.remove(index)

async def handle_commands(self, index, commands):
# Custom logic
result = []
all_indexes = self.storage.get("indexes")
for command in commands:
key = command.get("key")
code = command.get("code")
if code == "get":
state = storage.get(key)
result.append((all_indexes, [{"success": True, **command, "state": state}]))
elif code == "set":
state = command.get("state")
storage[key] = state
result.append(([index], [{"success": True, **command}]))
elif code == "update":
index = command.get("index")
value = command.get("value")
if not isinstance(index, int) or not isinstance(value, int):
result.append(([index], [{"success": False, **command}]))
continue
state = storage.get(key)
if state is None:
storage[key] = state = []
if index >= len(state):
state += [0] * (index - len(state) + 1)
state[index] = value
result.append((all_indexes, [{"success": True, **command}]))
return result

Разделение логики на контроллеры​

Допустим далее, что нам нужно сделать серверы по игре в шахматы, шашки, крестики-нолики... Для каждого создается отдельный класс логики, где переопределяется лишь один метод handle_commands() — и никакого дублирования кода. Передаем в конструктор SocketServer первым аргументом объект логики, и сервер готов к использованию:

class ChessLogic(MyLogic):
async def handle_commands(self, index, commands):
...

class CheckersLogic(MyLogic):
async def handle_commands(self, index, commands):
...

HOST, PORT = "", 5000
if __name__ == "__main__":
server = SocketServer(JSONParser(), CheckersLogic(), HOST, PORT)
server.run()
Но возможно, у вас уже возник закономерный вопрос. А что, если мы захотим создать сервер, где можно было бы по выбору играть и в шахматы, и в шашки? Вот тут уже придется изгаляться. Здесь нужен какой-то класс-диспетчер логики, который будет перенаправлять команды к соответствующему обработчику:

class ComboLogic:
def __init__(self) -> None:
self.chess = ChessLogic()
self.checkers = CheckersLogic()
# Use same storage for all
self.chess.storage = self.checkers.storage = self.storage = {}

async def on_connect(self, index, result):
...

async def on_disconnect(self, index, result):
...

async def handle_commands(self, index, commands):
# Custom logic
result = []
for command in commands:
key = command.get("key")
if key == "chess":
result.extend(await self.chess.handle_commands(index, commands))
elif key == "checkers":
result.extend(await self.checkers.handle_commands(index, commands))
return result

HOST, PORT = "", 5000
if __name__ == "__main__":
server = SocketServer(JSONParser(), ComboLogic(), HOST, PORT)
server.run()
Несложно заметить в цикле обработки команд явное дублирование кода (handle_commands() и extend()). Попробуем его устранить с помощью словаря:

class ComboLogic:
def __init__(self) -> None:
self.logic_by_key = {
"chess": ChessLogic(parser),
"checkers": CheckersLogic(parser),
}
self.storage = {}
# Provide common storage
for logic in self.logic_by_key.items():
logic.storage = self.storage
# ...
async def handle_commands(self, index, commands):
# Custom logic
result = []
for command in commands:
key = command.get("key")
logic = self.logic_by_key.get(key)
if logic:
result.extend(await logic.handle_commands(i, [command]))
return result

HOST, PORT = "", 5000
if __name__ == "__main__":
logic_by_key = {
"chess": ChessLogic(),
"checkers": CheckersLogic(),
}
server = SocketServer(JSONParser(), ComboLogic(logic_by_key), HOST, PORT)
server.run()
Сейчас класс логики выбирается по одному из свойств команды — key. Но впоследствии, когда будут реализованы комнаты (rooms) и перемещения игроков по ним, можно будет выбирать обработчик команды по тому, в какой комнате, в какой игре находится пользователь. Если он в покер-руме, по умолчанию берется логика покера, если за шахматным столом — логика шахмат.

Выделение из логики менеджера контроллеров (Application) как часть инфраструктуры


Выделение из логики менеджера контроллеров (Application) как часть инфраструктуры
Налицо фактическое разделение логики на два типа классов: диспетчер (ComboLogic) и собственно реализации логики (ChessLogic, CheckersLogic). У последних из общего только интерфейс с сигнатурой метода handle_commands(). А диспетчер всегда один и тот же для всех приложений и никогда не меняется. Фактически он превратился в движок приложения, поэтому его уместнее будет переименовать в Engine, или лучше — в Application. А логику отдельных игр тогда — в контроллеры:

class Application:
def __init__(self, default_controller, controller_by_key=None) -> None:
self.default_controller = default_controller
self.controller_by_key = controller_by_key or {}
self.storage = {} # App state

async def on_connect(self, index, result):
if self.default_controller:
self.default_controller.on_connect(storage, index, result)

async def on_disconnect(self, index, result):
if self.default_controller:
self.default_controller.on_disconnect(storage, index, result)

async def handle_commands(self, index, commands):
result = []
# Handle
for command in commands:
key = command.get("key")
controller = self.controller_by_key.get(key, self.default_controller)
if controller:
await controller.handle_command(self.storage, index, command, result)
return result

class MyController:
# To be able to send commands to all current connections
async def on_connect(self, storage, index, result):
indexes = storage.get("indexes")
if indexes is None:
storage["indexes"] = [index]
else:
indexes.append(index)

async def on_disconnect(self, storage, index, result):
indexes = storage.get("indexes")
if indexes and index in indexes:
indexes.remove(index)

async def handle_command(self, storage, index, command, result):
...

class ChessController:
async def handle_command(self, storage, index, command, result):
...

class CheckersController:
async def handle_command(self, storage, index, command, result):
...

HOST, PORT = "", 5000
if __name__ == "__main__":
controller_by_key = {
"chess": ChessLogic(),
"checkers": CheckersLogic(),
}
app = Application(MyController(), controller_by_key)
server = SocketServer(JSONParser(), app, HOST, PORT)
server.run()
Содержимое методов on_connect() и on_disconnect() было вынесено из приложения в контроллер по умолчанию, так как данная логика весьма специфическая и может меняться от приложения к приложению. Мы не должны для этого переопределять класс Application.

Отметим также, что в контроллерах теперь не handle_commands(), а handle_command(). То есть команды обрабатываются по одной. Это удобнее, так как не нужно каждый раз делать обработку в цикле. И главное — диспетчер все равно будет передавать на обработку по одной команде, так как любая команда в массиве может требовать своего собственного обработчика.

Еще, результат больше не возвращается через return, а передается в виде списков в аргументах. Это тоже упрощает реализацию обработчиков.

И последнее. Так как контроллеры — это в сущности всего лишь часть общей логики приложения, то все они должны использовать одно общее состояние (storage). Сами контроллеры состояния не имеют и иметь не могут. Они — логика в самом чистом виде. Поэтому при каждом вызове handle_command() среди прочих аргументов передается и ссылка на состояние приложения.

Состояние​

Скажем напоследок пару слов о состоянии и о том, как оно у нас получилось таким, каким получилось.

В ООП подходе все само собой складывается так, что для каждой логической сущности создается программный объект с свойствами и методами. В свойствах хранится текущее состояние сущности, а в методах реализуются функции, которые это состояние изменяют.

При таком подходе все состояние приложения размазано тонким слоем по десяткам и сотням таких объектов. Чтобы сохранить состояние всего приложения в файл, придется обойти все объекты, собрать все их свойства и перевести в простые JSON-объекты. А чтобы загрузить, восстановить приложение из файла, придется воссоздать по иерархии JSON-объектов иерархию наших программных объектов, определить нужный класс для каждого, учитывая параметры конструкторов, и потом восстановить значения всех его свойств (даже приватных). В общем, ясно, что это очень и очень сложно и муторно.

Тут сначала может появиться идея, что все свойства объекта можно просто хранить в словаре. И не перебирать свойства объекта, когда его нужно сохранить, а просто отдавать этот словарь. Следующей мыслью возникает вопрос. А зачем нам вообще восстанавливать все эти объекты — их иерархию и внутреннее состояние? Почему не оперировать изначально чистой JSON-структурой? Тогда и объекты никакие нужны не будут, а будут одни функции. Простой набор функций.

По счастью, Python мультипарадигменный язык программирования, и на нем можно писать и в ООП-стиле, и в процедурном, и в функциональном. Мы начали с самой простой возможной реализации сервера — процедурной. Поэтому состояние у нас было изначально в отдельном словаре, общем на все приложение.

Когда мы перешли к ООП, мы сохранили использование централизованного состояния. Мы не стали его распределять по классам, потому что в этом не было никакого смысла. Классы мы применяли лишь для группировки функций и возможности подменять реализации некоторых из них в подклассах (см. паттерн шаблонный метод). (Если же мы сможем так организовать методы, чтобы вообще не менять в них состояние, то перейдем к математической концепции функций — к функциональному стилю.)

Повезло нам с состоянием? Не совсем. Все дело в методике разработки. Всегда нужно начинать с самого простой возможной версии, а потом добавлять в нее только то, без чего нельзя обойтись. Тогда про многие проблемы вы даже и не узнаете, что они бывают.

Репозиторий​

В объектах команд помимо названия действия, которое нужно выполнить, также обычно указывается объекты, которые в этом участвуют. Например, чтобы передвинуть что-то на игровом поле, нужно явно определить, что вы будете двигать и куда. Если поле имеет декартову систему координат, то объекты можно указать через координаты. В противном случае придется использовать уникальные идентификаторы (id). Впрочем идентификаторы можно часто применять и параллельно с координатами (указывать или то, или другое).

Также возможны команды, которые будут требовать обращения к свойствам объекта. Тогда поля в команде могут иметь следующий формат: "{id}.{property}". Таким же образом можно обращаться и к различным вложенным объектам, например: "id1.inner_id2.inner_id3".

Сейчас состояние реализовано простым классом dict. Поэтому в коде мы не можем просто вызывать: object = storage.get("id1").get("inner_id2").get("inner_id3"), так как какого-то промежуточного элемента может и не быть, и тогда возникнет исключение. Но и делать проверки для каждого id мы не можем, так как для этого придется добавить много "глупого" кода. Мы не хотим загромождать нашу логику разными дурацкими проверками, но и не добавить их тоже не можем. Поэтому в идеале для получения объекта должна вызываться только одна функция: object = storage.get("id1.inner_id2.inner_id3") (все проверки и прочая логика должны выполняться в ней автоматически). А для этого придется создать новый класс для хранилища:

class Storage:
def __init__(self):
self.storage = {}

def get(self, path):
return resolve_path(self.storage, path)

def set(self, path, value):
...

def update(self, path, value):
...

def delete(self, path):
...

def resolve_path(target, path=None):
if not path or target is None:
return target
current = target
keys = path.split(".")
for key in keys:
if isinstance(current, dict):
current = current.get(key)
else:
return None
return current
Аналогично get() будут выполнены и остальные методы: set(), update(), delete(). Если промежуточный вложенный объект отсутствует, то можно либо возвращать None, либо создавать пустой dict на его месте. Также можно добавить в get() значение по умолчанию (get(path, default=None)), которое будет установлено (set()), если get(path) возвращает None.

Когда у нас есть собственный класс вместо стандартного, нам становится проще добавлять в него новые функции. Например, хранилище можно синхронизировать с базой данных или делать периодическое автосохранение полностью всего состояния в файл, чтобы игру можно было восстановить при случайном падении сервера.

В Application, таким образом, можно подставлять разные реализации хранилища, лишь бы они использовали тот же интерфейс. Тогда можно выбирать нужную стратегию работы с данными (получение и хранение) без всяких изменений со стороны контроллеров. Бизнес-логика не будет даже подозревать откуда берутся данные и сохраняются ли они в БД или нет. Это не их забота.

Выделение хранилища (Repository) как отдельного слоя инфраструктуры


Выделение хранилища (Repository) как отдельного слоя инфраструктуры
В DDD такой фасад для доступа к данным, который к тому же может поддерживать их целостность и актуальность, называется репозиторием (Repository). Поэтому мы вполне можем использовать и это название вместо Storage.

Сюда же можно добавить и настройки приложения. Чтобы не путать их с состоянием, добавим для них метод: getconfig(path). Все настройки приложения можно хранить в специальном файле (лучше всего для этого подходит формат YML) и загружать при его запуске. Поэтому методы setconfig() и updateconfig() не нужны.

Начальные состояния объектов также можно хранить в настройках, откуда оно будет копироваться при создании объекта состояния. На этот случай можно также создать отдельный метод:

class Repository:
# ...
def create(self, config_path=None, initial=None):
if self.state is None:
return None
# Get initial
config = self.getconfig(config_path, {}) if config_path else {}
initial = {**config, **initial}
id = initial.get("id")
if id is None:
# Generate id
...
# Set
return self.set(id, initial)
Так как теперь может существовать несколько вариаций хранилища, мы должны иметь возможность задать одну из них при инициализации приложения:

class Application:
def __init__(self, default_controller, controller_by_key=None, storage=None) -> None:
self.default_controller = default_controller
self.controller_by_key = controller_by_key or {}
self.storage = storage if storage else Repository() # App state

HOST, PORT = "", 5000
if __name__ == "__main__":
controller_by_key = {
"chess": ChessLogic(),
"checkers": CheckersLogic(),
}
app = Application(MyController(), controller_by_key, Repository())
server = SocketServer(JSONParser(), app, HOST, PORT)
server.run()

HTTP-сервер​

Без сокет-сервера не обойтись, если игра многопользовательская, и нужно как можно быстрее оповещать всех участников о происходящий в приложении событиях. Но если мы строим ферму или наряжаем ёлочку, то нам достаточно простого HTTP-сервера, ведь все события мы генерируем сами на клиенте. Даже если что-то происходит само на сервере, то это происходит прогнозируемым образом. А значит, в одном из сообщений сервер даст знать клиенту, в какой момент тому нужно сделать запрос, чтобы проверить, не случилось ли чего. Давайте теперь посмотрим, насколько сильно изменится наша реализация при использовании HTTP-протокола.

HTTP-сервер — это тоже сокет-сервер, но с тем отличием, что соединение разрывается сразу после отправки первого же ответного сообщения. Алгоритм его работы такой: сокет-соединение устанавливается, принимается запрос от клиента, он обрабатывается, и отсылается ответное сообщение на клиент. В конце каждой отправки соединение тут же закрывается.

Изолирование логики от инфраструктуры позволяет нам использовать логику повторно с любым типом серверов без всяких изменений в классах логики. Единственное, что будет меняться — это кое-какие слои инфраструктуры. В инфраструктуре первым делом изменяется способ транспортировки сообщений — слой Server.

HTTP — протокол стандартный и широко известный. Поэтому существует множество реализаций такого типа серверов, в том числе и на Python: Django, Twisted, Tornado. Мы выберем один из самых популярных и минималистичных — Flask. Принцип его работы можно проиллюстрировать следующим примером:

import json
from flask import Flask, send_file, request

app = Flask(__name__)

def handle(request):
return {}

@app.route("/storage/<key>")
def storage(key):
response = handle(request)
return response
Главная задача данных фреймворков — это преобразование строковых HTTP-сообщений в объекты request и response. Чтобы преобразовать эти объекты в привычные нам команды и обратно, создадим специальный FlaskParser. Допустим клиент использует RESTful API (это когда назначение запроса — взять, задать, изменить — определяется методом: get, post, patch):

class FlaskParser(Parser):
command_by_alias = {
"GET": "get",
"POST": "save",
"PATCH": "update",
}

def parse(self, request):
# Parse
values = request.values
data_str = values.get("data")
data = json.loads(data_str) if data_str else None
if data is None:
data = {}
# Prepare command
code = data.get("code")
if not code:
code = values.get("_method") or request.method
data["code"] = self.command_by_alias.get(code, code)
data["key"] = request.view_args.get("key")
return data, b""

# No real serialization needed here
def serialize(self, command):
return command
Также, поскольку HTTP-сервер запускается зачастую в нескольких процессах, которые все должны разделять общее состояние, то нам придется использовать репозиторий, хранящий данные не в памяти, а в какой-нибудь БД.

Итого, изменяется всего три слоя: сервер, парсер и репозиторий. С репозиторием ничего нового — синхронизировать данные с БД часто бывает нужно и в сокет-серверах. Класс сервера можно также использовать старый, так как там нам нужен только метод handle_bytes(). Для унификации его можно переименовать в более абстрактный handle_requests() и вынести весь код, кроме handle_connection() в базовый класс Server. В конце концов действительно уникальным для HTTP-сервера классом будет только парсер:

import json
from flask import Flask, send_file, request

app = Flask(__name__)
controller_by_key = {
"chess": ChessLogic(),
"checkers": CheckersLogic(),
}
application = Application(MyController(), controller_by_key, DBRepository())
server = Server(FlaskParser(), application)

@app.route("/storage/<key>")
async def storage(key):
return await server.handle_bytes(key, request)[0]
Одно из испытаний на универсальность наша схема успешно выдержала.

Выводы​

Разбиение всего приложения на несколько независимых друг от друга слоев позволяет классы каждого из них разрабатывать отдельно от классов других слоев. Все, что от них требуется — это держаться в рамках заданных для них интерфейсов. Если интерфейсы остаются неприкосновенными, то любые изменения внутри слоя никак не отразятся на остальных. В этом и заключается вся прелесть слоистой архитектуры.

Четыре слоя инфраструктуры и их отношение к логике (Controller)


Четыре слоя инфраструктуры и их отношение к логике (Controller)
При разбиении цельного приложения на слои мы сначала выделили две основные логические части: инфраструктуру и бизнес-логику. Первая в последствии разделилась на Server и Parser. А из второй отделились еще два инфраструктурных слоя: Application и Repository. В результате данные в программе обрабатываются по следующей цепочке:

Server → Parser → Application → Controller → Repository
Инфраструктура — это все то, что не относится напрямую к логике, но помогает ей выполнять свои задачи. Будучи общей для самых разных задач, ее можно вынести в основную библиотеку классов. А поскольку эти классы также задают всю структуру приложения, составляют его каркас, то его с полным правом можно назвать фреймворком.

Отделив все вспомогательные функции и вынеся их в специальный инфраструктурный фреймворк, мы получили бизнес-логику в чистом виде — в виде контроллеров. Настолько чистом, что они не зависят даже от самих себя (т.е. друг от друга). О том, как писать бизнес-логику правильно, и на какие слои разбивается она сама, можно узнать в следующей, заключительной статье.

Исходники

 
Сверху