Допустим, у нас большие планы, и мы хотим реализовать серверную часть для всех основных игровых жанров. Однако, прежде, чем приступить к этому, нужно хорошенько подготовиться. Нужно создать такую основу, которая бы подходила для каждой игры, чтобы потом не нужно было ничего переделывать на пол-пути. В том числе и все сделанные на тот момент игры.
Главная наша задача — совместить несовместимое, выработать такое решение, которое бы позволяло создавать игровые приложения любой сложности качественно и максимально быстро! Сочетание этих несовместимых, казалось бы, условий обеспечит нам фреймворк, эволюцию которого мы и намерены здесь проследить. В первой статье будет описано создание инфраструктурного фреймворка, а во второй — разработка логики на его основе. Всего — две статьи на описание методологии разработки всей серверной части.
В качестве языка программирования выберем Python за его простоту и элегантность. Мы начнем в сокетов (asyncio), а закончим HTTP-сервером. Наша задача состоит в том, чтобы код логики не зависел от типа сервера и задействованных сетевых протоколов.
На сервере, прежде чем команды дойдут до логики, мы должны установить соединение с клиентом и принять от него сообщения. После этого мы декодируем их в объекты команд и передаем их соответствующей функции бизнес-логики на обработку. Таким образом, вокруг логики выстраивается целый массив инфраструктуры, которую нам предстоит упорядочить. Чтобы разобраться в ней самым внимательным образом, начнем с самого простого примера сокет-сервера.
Про то, как реализованы сокеты в Python, мы писали в другом месте. Тут мы продолжим с того, на чем закончили там — с TCP-сервера на asyncio. Добавим в него кодирование и обработку сообщений, а также возможность отправлять их другим клиентам. В результате чего наша первая версия программы будет выглядеть так:
last_index = 0
writers = []
async def handle_connection(reader, writer):
global last_index
global writers
writers.append(writer)
last_index += 1
index = last_index
print("+Connected")
unparsed_bytes = b""
while True:
# Receive
try:
request_bytes = await reader.read(1024)
except ConnectionError:
break
if reader.at_eof():
break # Disconnected by client
request_bytes = unparsed_bytes + request_bytes
request_bytes_list = request_bytes.split(b"\x00")
unparsed_bytes = request_bytes_list.pop()
# Process
for request_bytes in request_bytes_list:
if not request_bytes:
continue
request = request_bytes.decode("utf8")
print(" >> Received: {repr(request)}")
try:
command = json.loads(request)
to_self_command, to_all_command = await handle_command(index, command)
except Exception as e:
print(f"[SERVER#{index}] Error while parsing or processing: {e}")
to_self_command, to_all_command = {"error": str(e)}, None
self_response = json.dumps(to_self_command) if to_self_command else None
all_response = json.dumps(to_all_command) if to_all_command else None
print(f" << Send: {repr(self_response)} as self_response and commands: "
f"{repr(all_response)} to all {len(writers)} connections")
if self_response:
to_self_bytes = self_response.encode("utf8") + b"\x00"
try:
writer.write(to_self_bytes)
await writer.drain()
except ConnectionError:
pass # Yet must send to others
if all_response:
to_all_bytes = all_response.encode("utf8") + b"\x00"
for w in writers:
try:
w.write(to_all_bytes)
except ConnectionError:
continue
await asyncio.gather(w.drain() for w in self.writers)
writers.remove(writer)
writer.close()
print(f"-Disconnected")
async def main(host, port):
print(f"Start server: {host}:{port}")
server = await asyncio.start_server(handle_connection, host, port)
async with server:
await server.serve_forever()
HOST, PORT = "", 5554
if __name__ == "__main__":
asyncio.run(main(HOST, PORT))
Это был код, отвечающий за пересылку и первичную обработку сообщений. Бизнес-логика реализована в handle_command(). В ней реализовано всего три команды: взять, поместить и изменить (get, set, update). Так как в логике могут присутствовать запросы к базам данным или внешним сервисам, то, чтобы не блокировать выполнение программы на это время, данная функция также сделана асинхронной (async):
storage = {}
async def handle_command(index, command):
global storage
key = command.get("key")
code = command.get("code")
if code == "get":
state = storage.get(key)
return {"success": True, **command, "state": state}, None
elif code == "set":
state = command.get("state")
storage[key] = state
return {"success": True, **command}, None
elif code == "update":
index = command.get("index")
value = command.get("value")
if not isinstance(index, int) or not isinstance(value, int):
return {"success": False, **command}, None
state = storage.get(key)
if state is None:
storage[key] = state = []
if index >= len(state):
state += [0] * (index - len(state) + 1)
state[index] = value
return None, {"success": True, **command}
return None, None
Пока что все соединения сохраняются в простой массив writers, а в обработчике команд (handle_command()) мы различаем только два типа адресатов для ответных команд: назад к себе (to_self_command), и ко всем (to_all_command). Это самый примитивный подход и в одном из следующих примеров мы заменим его на перечисление индивидуальных адресатов по их индексам. Обработчик команд будет возвращать массив кортежей, каждый из которых в первом элементе будет содержать список индексов соединений, а во втором список команд, которые им предназначаются: result = [(indexes, commands), (indexes2, commands2)]. Соответственно, все соединения должны будут сохраняться не в списке writers, как раньше, а в словаре writer_by_index. Но пока вернемся к архитектуре приложения.
В качестве быстрого решения можно передавать ссылку на функцию с логикой (handle_command()) в параметрах handle_connection(). Или можно вынести обе функции в класс, а в подклассах их переопределять. Но лучше всего логику (обработку команд) и инфраструктуру (обмен командами по сети) вообще реализовать в двух разных классах.
Преимущество классов перед отдельными функциями в том, что в них можно хранить собственный контекст из переменных-членов (атрибутов), а также группировать и переопределять в подклассах методы.
Разделение приложения на инфраструктуру (Server) и логику (Logic)
Разделение на два класса задает два магистральных пути развития в разработке сервера. С одной стороны мы можем подготовить разные типы и версии функционала по передаче объектов (Server), с другой — наделать кучу разных игр (Logic). И все это независимо друг от друга. Единственное условие, которое должно соблюдаться, это чтобы класс логики содержал метод с такой сигнатурой: handle_command(self, index, command):
class SocketServer:
def __init__(self, logic, host, port):
self.logic = logic
self.host = host
self.port = port
self.writers = []
def run(self):
asyncio.run(self.main())
async def main(self):
print(f"Start server: {self.host}:{self.port}")
server = await asyncio.start_server(handle_connection, host, port)
async with server:
await server.serve_forever()
async def handle_connection(self, reader, writer):
...
# Call:
# to_self_command, to_all_command = await self.logic.handle_command(index, command)
# instead of:
# to_self_command, to_all_command = await handle_command(index, command)
class MyLogic:
# global storage -> self.storage
def __init__(self):
self.storage = {}
async def handle_command(self, index, command):
...
Код запуска при этом немного изменится:
HOST, PORT = "", 5000
if __name__ == "__main__":
server = SocketServer(MyLogic(), HOST, PORT)
server.run()
Таким образом, сервер получает определенный класс логики и обращается к нему согласно интерфейсу. Связь эта односторонняя, так как логика ничего не знает об инфраструктуре.
Это был только первый шаг. Следующим будет — отделение функционала по транспортировке сообщений (сокеты) от способа их кодирования (JSON, YML, XML).
Заглянув внутрь него, мы увидим, что определенный способ обработки сообщений (JSON) там жестко завязан на определенный способ их передачи (TCP-сокеты). И если мы захотим использовать другой формат передачи данных, то нам заодно придется скопировать кучу кода, к формату не относящегося.
Первое решение, которое приходит на ум — реализовать данные функции в разных методах (выделить методы parse() и serialize()):
class SocketServer:
def __init__(self, logic, host, port) -> None:
self.logic = logic
self.host = host
self.port = port
self.last_index = 0
self.writer_by_index = {}
async def handle_connection(self, reader, writer):
self.last_index += 1
index = self.last_index
self.writer_by_index[index] = writer
print(f"[SERVER#{index}] +Connected")
unparsed_bytes = b""
while True:
# Receive
try:
request_bytes = await reader.read(1024)
except ConnectionError:
break
if reader.at_eof():
print(f"[SERVER#{index}] EOF. Connection closed")
break
request_bytes = unparsed_bytes + request_bytes
# Handle
result, unparsed_bytes = await self.handle_bytes(index, request_bytes)
# Send response
await self.send(result)
print(f"[SERVER#{index}] -Disconnected")
del self.writer_by_index[index]
writer.close()
async def handle_bytes(self, index, request_bytes):
# Decode request
request, unparsed_bytes = self.parse(request_bytes)
# Make response
try:
# Parse request
command = json.loads(request)
# Process request
result = self.logic.handle_command(index, command)
except Exception as e:
print(f"[SERVER#{index}] Error while parsing or processing: {request} {traceback.format_exc()}")
result = [([index], [{"error": str(e)}])]
return result, unparsed_bytes
async def send(self, result):
if not result:
return
# Serialize
result = [(indexes, self.serialize(commands))
for indexes, commands in result]
# Send
wait_writers = []
for indexes, response_bytes in result:
for i in indexes:
writer = self.writer_by_index.get(i)
if writer:
try:
writer.write(response_bytes)
wait_writers.append(writer)
except ConnectionError:
continue
await asyncio.gather(writer.drain() for writer in wait_writers)
def parse(self, data_bytes):
request_bytes, unparsed_bytes = request_bytes.split(b"\x00")
request = request_bytes.decode("utf8")
return request, unparsed_bytes
def serialize(self, data):
return json.dumps(commands).encode("utf8") + b"\x00"
Теперь, например, если мы захотим изменить JSON на YML, то нам нужно всего лишь наследоваться от SocketServer и переопределить parse() и serialize(). Но при таком подходе остается все тот же недостаток при комбинировании разных функционалов. Для каждой комбинации придется создавать отдельный класс: JSONTCPSocketServer, YMLUDPSocketServer, JSONHTTPServer и так далее. Рассмотрим данный вопрос подробнее.
Код по доставке (сокеты) и код по форматированию (JSON) остается в одном классе. Это значит, что если нужно создать классы для двух типов сокетов (TCP и UDP) и трех видов форматов (JSON, YML, XML), то в итоге мы получим 2 * 3 = 6 классов для всех возможных комбинаций. Хотя должно быть по идее 2 + 3 = 5. Пусть 6 и 5 отличаются не сильно, но иметь в качестве закона возрастания кода умножение вместо сложения дает уже на следующем этапе избыточность в 33 % (3 * 3 = 9, 3 + 3 = 6). И то, что код при этом не дублируется — заслуга Python'а (множественное наследование), а не наша.
Выделение парсера как отдельного инфраструктурного слоя
Поэтому лучше поступить по-нормальному и разнести доставку и первичную обработку сообщений по разным классам. В данном случае вынести из класса сервера парсер:
class SocketServer:
def __init__(self, parser, logic, host, port) -> None:
self.parser = parser
self.logic = logic
self.host = host
self.port = port
self.last_index = 0
self.writer_by_index = {}
# ...
async def handle_bytes(self, index, request_bytes):
try:
# Parse
commands, unparsed_bytes = self.parser.parse(request_bytes)
# Handle
result = await self.logic.handle_commands(index, commands)
except Exception as e:
result = [([index], [{"error": str(e)}])]
unparsed_bytes = b""
return result, unparsed_bytes
async def send(self, result):
if not result:
return
# Serialize
result = [(indexes, self.parser.serialize(commands))
for indexes, commands in result]
# Send
wait_writers = []
for indexes, response_bytes in result:
for i in indexes:
writer = self.writer_by_index.get(i)
if writer:
try:
writer.write(response_bytes)
wait_writers.append(writer)
except ConnectionError:
continue
await asyncio.gather(writer.drain() for writer in wait_writers)
class Parser:
def parse(self, data_bytes):
return data_bytes, b""
def serialize(self, data):
return data
class JSONParser(Parser):
def parse(self, data_bytes):
# Get unparsed_bytes
data_bytes, unparsed_bytes = data_bytes.rsplit(b"\x00", 1)
# bytes -> list of str
data_str = data_bytes.decode("utf8")
message_list = data_str.split("\x00")
# Parse JSON commands (suppose, a command cannot be a list)
result = []
for message in message_list:
if not message:
continue
commands = json.loads(message)
if not commands:
continue
if isinstance(commands, list):
result.extend(commands)
else:
result.append(commands)
return result, unparsed_bytes
def serialize(self, data):
if not data:
return b""
data_str = json.dumps(data)
data_bytes = data_str.encode("utf8") + b"\x00"
return data_bytes
class MyLogic:
async def handle_commands(self, index, commands):
# Custom logic
result = []
for command in commands:
key = command.get("key")
code = command.get("code")
...
return result
HOST, PORT = "", 5000
if __name__ == "__main__":
server = SocketServer(JSONParser(), MyLogic(), HOST, PORT)
server.run()
Остановимся на кое-каких технических деталях.
Сервер может за раз принять одно сообщение, а может принять и несколько — в зависимости от того, сколько в потоке байтов присутствует разделителей. Сколько было в буфере, столько и возвращает. Да и в каждом сообщении, в принципе, можно отправлять сразу несколько команд вместо одной. Поэтому логично в парсере возвращать сразу список команд и условиться, что всегда возвращаться будет только список. Соответственно, и handle_command() будет принимать и возвращать команды только списками. Потому он и переименован теперь в handle_commands().
То, что мы передаем в парсер байты, а не декодированные в UTF-8 строки, позволяет нам реализовывать в них собственные кастомные бинарные протоколы. Разграничение сообщений нулевым символом (b"\x00") помещено в парсер по этой же причине. Если удалось распарсить команду, она возвращается. Если нет — возвращаются байты, чтобы позже к ним добавить новые и повторить попытку.
Как вы могли заметить, мы перешли от массовой адресации сообщений к точечной, о которой уже говорили выше. То есть вместо отсылки данных всем соединениям в списке writers, мы можем теперь отправлять команды по конкретным индексам соединений из словаря writer_by_index. Для этого handle_commands() к каждому списку команд добавляет еще список индексов соединений, которым они предназначены.
Чтобы по-прежнему можно было делать массовую отправку, нужно сохранять все индексы в хранилище при подключении, а при отключении — удалять их из списка. Поэтому в класс логики добавляется еще пара методов: on_connect() и on_disconnect():
class SocketServer:
# ...
async def handle_connection(self, reader, writer):
...
result = []
await self.logic.on_connect(index, result)
await self.send(result)
unparsed_bytes = b""
while True:
...
result = []
await self.logic.on_disconnect(index, result)
await self.send(result)
del self.writer_by_index[index]
writer.close()
class MyLogic:
def __init__(self):
self.storage = {}
async def on_connect(self, index, result):
indexes = self.storage.get("indexes")
if indexes is None:
self.storage["indexes"] = [index]
else:
indexes.append(index)
async def on_disconnect(self, index, result):
indexes = self.storage.get("indexes")
if indexes and index in indexes:
indexes.remove(index)
async def handle_commands(self, index, commands):
# Custom logic
result = []
all_indexes = self.storage.get("indexes")
for command in commands:
key = command.get("key")
code = command.get("code")
if code == "get":
state = storage.get(key)
result.append((all_indexes, [{"success": True, **command, "state": state}]))
elif code == "set":
state = command.get("state")
storage[key] = state
result.append(([index], [{"success": True, **command}]))
elif code == "update":
index = command.get("index")
value = command.get("value")
if not isinstance(index, int) or not isinstance(value, int):
result.append(([index], [{"success": False, **command}]))
continue
state = storage.get(key)
if state is None:
storage[key] = state = []
if index >= len(state):
state += [0] * (index - len(state) + 1)
state[index] = value
result.append((all_indexes, [{"success": True, **command}]))
return result
class ChessLogic(MyLogic):
async def handle_commands(self, index, commands):
...
class CheckersLogic(MyLogic):
async def handle_commands(self, index, commands):
...
HOST, PORT = "", 5000
if __name__ == "__main__":
server = SocketServer(JSONParser(), CheckersLogic(), HOST, PORT)
server.run()
Но возможно, у вас уже возник закономерный вопрос. А что, если мы захотим создать сервер, где можно было бы по выбору играть и в шахматы, и в шашки? Вот тут уже придется изгаляться. Здесь нужен какой-то класс-диспетчер логики, который будет перенаправлять команды к соответствующему обработчику:
class ComboLogic:
def __init__(self) -> None:
self.chess = ChessLogic()
self.checkers = CheckersLogic()
# Use same storage for all
self.chess.storage = self.checkers.storage = self.storage = {}
async def on_connect(self, index, result):
...
async def on_disconnect(self, index, result):
...
async def handle_commands(self, index, commands):
# Custom logic
result = []
for command in commands:
key = command.get("key")
if key == "chess":
result.extend(await self.chess.handle_commands(index, commands))
elif key == "checkers":
result.extend(await self.checkers.handle_commands(index, commands))
return result
HOST, PORT = "", 5000
if __name__ == "__main__":
server = SocketServer(JSONParser(), ComboLogic(), HOST, PORT)
server.run()
Несложно заметить в цикле обработки команд явное дублирование кода (handle_commands() и extend()). Попробуем его устранить с помощью словаря:
class ComboLogic:
def __init__(self) -> None:
self.logic_by_key = {
"chess": ChessLogic(parser),
"checkers": CheckersLogic(parser),
}
self.storage = {}
# Provide common storage
for logic in self.logic_by_key.items():
logic.storage = self.storage
# ...
async def handle_commands(self, index, commands):
# Custom logic
result = []
for command in commands:
key = command.get("key")
logic = self.logic_by_key.get(key)
if logic:
result.extend(await logic.handle_commands(i, [command]))
return result
HOST, PORT = "", 5000
if __name__ == "__main__":
logic_by_key = {
"chess": ChessLogic(),
"checkers": CheckersLogic(),
}
server = SocketServer(JSONParser(), ComboLogic(logic_by_key), HOST, PORT)
server.run()
Сейчас класс логики выбирается по одному из свойств команды — key. Но впоследствии, когда будут реализованы комнаты (rooms) и перемещения игроков по ним, можно будет выбирать обработчик команды по тому, в какой комнате, в какой игре находится пользователь. Если он в покер-руме, по умолчанию берется логика покера, если за шахматным столом — логика шахмат.
Выделение из логики менеджера контроллеров (Application) как часть инфраструктуры
Налицо фактическое разделение логики на два типа классов: диспетчер (ComboLogic) и собственно реализации логики (ChessLogic, CheckersLogic). У последних из общего только интерфейс с сигнатурой метода handle_commands(). А диспетчер всегда один и тот же для всех приложений и никогда не меняется. Фактически он превратился в движок приложения, поэтому его уместнее будет переименовать в Engine, или лучше — в Application. А логику отдельных игр тогда — в контроллеры:
class Application:
def __init__(self, default_controller, controller_by_key=None) -> None:
self.default_controller = default_controller
self.controller_by_key = controller_by_key or {}
self.storage = {} # App state
async def on_connect(self, index, result):
if self.default_controller:
self.default_controller.on_connect(storage, index, result)
async def on_disconnect(self, index, result):
if self.default_controller:
self.default_controller.on_disconnect(storage, index, result)
async def handle_commands(self, index, commands):
result = []
# Handle
for command in commands:
key = command.get("key")
controller = self.controller_by_key.get(key, self.default_controller)
if controller:
await controller.handle_command(self.storage, index, command, result)
return result
class MyController:
# To be able to send commands to all current connections
async def on_connect(self, storage, index, result):
indexes = storage.get("indexes")
if indexes is None:
storage["indexes"] = [index]
else:
indexes.append(index)
async def on_disconnect(self, storage, index, result):
indexes = storage.get("indexes")
if indexes and index in indexes:
indexes.remove(index)
async def handle_command(self, storage, index, command, result):
...
class ChessController:
async def handle_command(self, storage, index, command, result):
...
class CheckersController:
async def handle_command(self, storage, index, command, result):
...
HOST, PORT = "", 5000
if __name__ == "__main__":
controller_by_key = {
"chess": ChessLogic(),
"checkers": CheckersLogic(),
}
app = Application(MyController(), controller_by_key)
server = SocketServer(JSONParser(), app, HOST, PORT)
server.run()
Содержимое методов on_connect() и on_disconnect() было вынесено из приложения в контроллер по умолчанию, так как данная логика весьма специфическая и может меняться от приложения к приложению. Мы не должны для этого переопределять класс Application.
Отметим также, что в контроллерах теперь не handle_commands(), а handle_command(). То есть команды обрабатываются по одной. Это удобнее, так как не нужно каждый раз делать обработку в цикле. И главное — диспетчер все равно будет передавать на обработку по одной команде, так как любая команда в массиве может требовать своего собственного обработчика.
Еще, результат больше не возвращается через return, а передается в виде списков в аргументах. Это тоже упрощает реализацию обработчиков.
И последнее. Так как контроллеры — это в сущности всего лишь часть общей логики приложения, то все они должны использовать одно общее состояние (storage). Сами контроллеры состояния не имеют и иметь не могут. Они — логика в самом чистом виде. Поэтому при каждом вызове handle_command() среди прочих аргументов передается и ссылка на состояние приложения.
В ООП подходе все само собой складывается так, что для каждой логической сущности создается программный объект с свойствами и методами. В свойствах хранится текущее состояние сущности, а в методах реализуются функции, которые это состояние изменяют.
При таком подходе все состояние приложения размазано тонким слоем по десяткам и сотням таких объектов. Чтобы сохранить состояние всего приложения в файл, придется обойти все объекты, собрать все их свойства и перевести в простые JSON-объекты. А чтобы загрузить, восстановить приложение из файла, придется воссоздать по иерархии JSON-объектов иерархию наших программных объектов, определить нужный класс для каждого, учитывая параметры конструкторов, и потом восстановить значения всех его свойств (даже приватных). В общем, ясно, что это очень и очень сложно и муторно.
Тут сначала может появиться идея, что все свойства объекта можно просто хранить в словаре. И не перебирать свойства объекта, когда его нужно сохранить, а просто отдавать этот словарь. Следующей мыслью возникает вопрос. А зачем нам вообще восстанавливать все эти объекты — их иерархию и внутреннее состояние? Почему не оперировать изначально чистой JSON-структурой? Тогда и объекты никакие нужны не будут, а будут одни функции. Простой набор функций.
По счастью, Python мультипарадигменный язык программирования, и на нем можно писать и в ООП-стиле, и в процедурном, и в функциональном. Мы начали с самой простой возможной реализации сервера — процедурной. Поэтому состояние у нас было изначально в отдельном словаре, общем на все приложение.
Когда мы перешли к ООП, мы сохранили использование централизованного состояния. Мы не стали его распределять по классам, потому что в этом не было никакого смысла. Классы мы применяли лишь для группировки функций и возможности подменять реализации некоторых из них в подклассах (см. паттерн шаблонный метод). (Если же мы сможем так организовать методы, чтобы вообще не менять в них состояние, то перейдем к математической концепции функций — к функциональному стилю.)
Повезло нам с состоянием? Не совсем. Все дело в методике разработки. Всегда нужно начинать с самого простой возможной версии, а потом добавлять в нее только то, без чего нельзя обойтись. Тогда про многие проблемы вы даже и не узнаете, что они бывают.
Также возможны команды, которые будут требовать обращения к свойствам объекта. Тогда поля в команде могут иметь следующий формат: "{id}.{property}". Таким же образом можно обращаться и к различным вложенным объектам, например: "id1.inner_id2.inner_id3".
Сейчас состояние реализовано простым классом dict. Поэтому в коде мы не можем просто вызывать: object = storage.get("id1").get("inner_id2").get("inner_id3"), так как какого-то промежуточного элемента может и не быть, и тогда возникнет исключение. Но и делать проверки для каждого id мы не можем, так как для этого придется добавить много "глупого" кода. Мы не хотим загромождать нашу логику разными дурацкими проверками, но и не добавить их тоже не можем. Поэтому в идеале для получения объекта должна вызываться только одна функция: object = storage.get("id1.inner_id2.inner_id3") (все проверки и прочая логика должны выполняться в ней автоматически). А для этого придется создать новый класс для хранилища:
class Storage:
def __init__(self):
self.storage = {}
def get(self, path):
return resolve_path(self.storage, path)
def set(self, path, value):
...
def update(self, path, value):
...
def delete(self, path):
...
def resolve_path(target, path=None):
if not path or target is None:
return target
current = target
keys = path.split(".")
for key in keys:
if isinstance(current, dict):
current = current.get(key)
else:
return None
return current
Аналогично get() будут выполнены и остальные методы: set(), update(), delete(). Если промежуточный вложенный объект отсутствует, то можно либо возвращать None, либо создавать пустой dict на его месте. Также можно добавить в get() значение по умолчанию (get(path, default=None)), которое будет установлено (set()), если get(path) возвращает None.
Когда у нас есть собственный класс вместо стандартного, нам становится проще добавлять в него новые функции. Например, хранилище можно синхронизировать с базой данных или делать периодическое автосохранение полностью всего состояния в файл, чтобы игру можно было восстановить при случайном падении сервера.
В Application, таким образом, можно подставлять разные реализации хранилища, лишь бы они использовали тот же интерфейс. Тогда можно выбирать нужную стратегию работы с данными (получение и хранение) без всяких изменений со стороны контроллеров. Бизнес-логика не будет даже подозревать откуда берутся данные и сохраняются ли они в БД или нет. Это не их забота.
Выделение хранилища (Repository) как отдельного слоя инфраструктуры
В DDD такой фасад для доступа к данным, который к тому же может поддерживать их целостность и актуальность, называется репозиторием (Repository). Поэтому мы вполне можем использовать и это название вместо Storage.
Сюда же можно добавить и настройки приложения. Чтобы не путать их с состоянием, добавим для них метод: getconfig(path). Все настройки приложения можно хранить в специальном файле (лучше всего для этого подходит формат YML) и загружать при его запуске. Поэтому методы setconfig() и updateconfig() не нужны.
Начальные состояния объектов также можно хранить в настройках, откуда оно будет копироваться при создании объекта состояния. На этот случай можно также создать отдельный метод:
class Repository:
# ...
def create(self, config_path=None, initial=None):
if self.state is None:
return None
# Get initial
config = self.getconfig(config_path, {}) if config_path else {}
initial = {**config, **initial}
id = initial.get("id")
if id is None:
# Generate id
...
# Set
return self.set(id, initial)
Так как теперь может существовать несколько вариаций хранилища, мы должны иметь возможность задать одну из них при инициализации приложения:
class Application:
def __init__(self, default_controller, controller_by_key=None, storage=None) -> None:
self.default_controller = default_controller
self.controller_by_key = controller_by_key or {}
self.storage = storage if storage else Repository() # App state
HOST, PORT = "", 5000
if __name__ == "__main__":
controller_by_key = {
"chess": ChessLogic(),
"checkers": CheckersLogic(),
}
app = Application(MyController(), controller_by_key, Repository())
server = SocketServer(JSONParser(), app, HOST, PORT)
server.run()
HTTP-сервер — это тоже сокет-сервер, но с тем отличием, что соединение разрывается сразу после отправки первого же ответного сообщения. Алгоритм его работы такой: сокет-соединение устанавливается, принимается запрос от клиента, он обрабатывается, и отсылается ответное сообщение на клиент. В конце каждой отправки соединение тут же закрывается.
Изолирование логики от инфраструктуры позволяет нам использовать логику повторно с любым типом серверов без всяких изменений в классах логики. Единственное, что будет меняться — это кое-какие слои инфраструктуры. В инфраструктуре первым делом изменяется способ транспортировки сообщений — слой Server.
HTTP — протокол стандартный и широко известный. Поэтому существует множество реализаций такого типа серверов, в том числе и на Python: Django, Twisted, Tornado. Мы выберем один из самых популярных и минималистичных — Flask. Принцип его работы можно проиллюстрировать следующим примером:
import json
from flask import Flask, send_file, request
app = Flask(__name__)
def handle(request):
return {}
@app.route("/storage/<key>")
def storage(key):
response = handle(request)
return response
Главная задача данных фреймворков — это преобразование строковых HTTP-сообщений в объекты request и response. Чтобы преобразовать эти объекты в привычные нам команды и обратно, создадим специальный FlaskParser. Допустим клиент использует RESTful API (это когда назначение запроса — взять, задать, изменить — определяется методом: get, post, patch):
class FlaskParser(Parser):
command_by_alias = {
"GET": "get",
"POST": "save",
"PATCH": "update",
}
def parse(self, request):
# Parse
values = request.values
data_str = values.get("data")
data = json.loads(data_str) if data_str else None
if data is None:
data = {}
# Prepare command
code = data.get("code")
if not code:
code = values.get("_method") or request.method
data["code"] = self.command_by_alias.get(code, code)
data["key"] = request.view_args.get("key")
return data, b""
# No real serialization needed here
def serialize(self, command):
return command
Также, поскольку HTTP-сервер запускается зачастую в нескольких процессах, которые все должны разделять общее состояние, то нам придется использовать репозиторий, хранящий данные не в памяти, а в какой-нибудь БД.
Итого, изменяется всего три слоя: сервер, парсер и репозиторий. С репозиторием ничего нового — синхронизировать данные с БД часто бывает нужно и в сокет-серверах. Класс сервера можно также использовать старый, так как там нам нужен только метод handle_bytes(). Для унификации его можно переименовать в более абстрактный handle_requests() и вынести весь код, кроме handle_connection() в базовый класс Server. В конце концов действительно уникальным для HTTP-сервера классом будет только парсер:
import json
from flask import Flask, send_file, request
app = Flask(__name__)
controller_by_key = {
"chess": ChessLogic(),
"checkers": CheckersLogic(),
}
application = Application(MyController(), controller_by_key, DBRepository())
server = Server(FlaskParser(), application)
@app.route("/storage/<key>")
async def storage(key):
return await server.handle_bytes(key, request)[0]
Одно из испытаний на универсальность наша схема успешно выдержала.
Четыре слоя инфраструктуры и их отношение к логике (Controller)
При разбиении цельного приложения на слои мы сначала выделили две основные логические части: инфраструктуру и бизнес-логику. Первая в последствии разделилась на Server и Parser. А из второй отделились еще два инфраструктурных слоя: Application и Repository. В результате данные в программе обрабатываются по следующей цепочке:
Отделив все вспомогательные функции и вынеся их в специальный инфраструктурный фреймворк, мы получили бизнес-логику в чистом виде — в виде контроллеров. Настолько чистом, что они не зависят даже от самих себя (т.е. друг от друга). О том, как писать бизнес-логику правильно, и на какие слои разбивается она сама, можно узнать в следующей, заключительной статье.
Исходники
Главная наша задача — совместить несовместимое, выработать такое решение, которое бы позволяло создавать игровые приложения любой сложности качественно и максимально быстро! Сочетание этих несовместимых, казалось бы, условий обеспечит нам фреймворк, эволюцию которого мы и намерены здесь проследить. В первой статье будет описано создание инфраструктурного фреймворка, а во второй — разработка логики на его основе. Всего — две статьи на описание методологии разработки всей серверной части.
В качестве языка программирования выберем Python за его простоту и элегантность. Мы начнем в сокетов (asyncio), а закончим HTTP-сервером. Наша задача состоит в том, чтобы код логики не зависел от типа сервера и задействованных сетевых протоколов.
Исходная точка
При разборе клиента мы проследили, как из отображения выделяется логика — сначала модель, а потом и контроллер. Потом мы все игровые действия вместо вызовов методов сделали простыми объектами-командами. Так как отображение и логика взаимодействуют исключительно такими командами, то отображению становится все равно, где находится логика — в том же приложении или в другом, запущенном на иной машине, расположенной за тысячи километров. На клиенте контроллер без логики, таким образом, превращается в простой шлюз между отображением и логикой, между клиентом и сервером.На сервере, прежде чем команды дойдут до логики, мы должны установить соединение с клиентом и принять от него сообщения. После этого мы декодируем их в объекты команд и передаем их соответствующей функции бизнес-логики на обработку. Таким образом, вокруг логики выстраивается целый массив инфраструктуры, которую нам предстоит упорядочить. Чтобы разобраться в ней самым внимательным образом, начнем с самого простого примера сокет-сервера.
Про то, как реализованы сокеты в Python, мы писали в другом месте. Тут мы продолжим с того, на чем закончили там — с TCP-сервера на asyncio. Добавим в него кодирование и обработку сообщений, а также возможность отправлять их другим клиентам. В результате чего наша первая версия программы будет выглядеть так:
last_index = 0
writers = []
async def handle_connection(reader, writer):
global last_index
global writers
writers.append(writer)
last_index += 1
index = last_index
print("+Connected")
unparsed_bytes = b""
while True:
# Receive
try:
request_bytes = await reader.read(1024)
except ConnectionError:
break
if reader.at_eof():
break # Disconnected by client
request_bytes = unparsed_bytes + request_bytes
request_bytes_list = request_bytes.split(b"\x00")
unparsed_bytes = request_bytes_list.pop()
# Process
for request_bytes in request_bytes_list:
if not request_bytes:
continue
request = request_bytes.decode("utf8")
print(" >> Received: {repr(request)}")
try:
command = json.loads(request)
to_self_command, to_all_command = await handle_command(index, command)
except Exception as e:
print(f"[SERVER#{index}] Error while parsing or processing: {e}")
to_self_command, to_all_command = {"error": str(e)}, None
self_response = json.dumps(to_self_command) if to_self_command else None
all_response = json.dumps(to_all_command) if to_all_command else None
print(f" << Send: {repr(self_response)} as self_response and commands: "
f"{repr(all_response)} to all {len(writers)} connections")
if self_response:
to_self_bytes = self_response.encode("utf8") + b"\x00"
try:
writer.write(to_self_bytes)
await writer.drain()
except ConnectionError:
pass # Yet must send to others
if all_response:
to_all_bytes = all_response.encode("utf8") + b"\x00"
for w in writers:
try:
w.write(to_all_bytes)
except ConnectionError:
continue
await asyncio.gather(w.drain() for w in self.writers)
writers.remove(writer)
writer.close()
print(f"-Disconnected")
async def main(host, port):
print(f"Start server: {host}:{port}")
server = await asyncio.start_server(handle_connection, host, port)
async with server:
await server.serve_forever()
HOST, PORT = "", 5554
if __name__ == "__main__":
asyncio.run(main(HOST, PORT))
Это был код, отвечающий за пересылку и первичную обработку сообщений. Бизнес-логика реализована в handle_command(). В ней реализовано всего три команды: взять, поместить и изменить (get, set, update). Так как в логике могут присутствовать запросы к базам данным или внешним сервисам, то, чтобы не блокировать выполнение программы на это время, данная функция также сделана асинхронной (async):
storage = {}
async def handle_command(index, command):
global storage
key = command.get("key")
code = command.get("code")
if code == "get":
state = storage.get(key)
return {"success": True, **command, "state": state}, None
elif code == "set":
state = command.get("state")
storage[key] = state
return {"success": True, **command}, None
elif code == "update":
index = command.get("index")
value = command.get("value")
if not isinstance(index, int) or not isinstance(value, int):
return {"success": False, **command}, None
state = storage.get(key)
if state is None:
storage[key] = state = []
if index >= len(state):
state += [0] * (index - len(state) + 1)
state[index] = value
return None, {"success": True, **command}
return None, None
Пока что все соединения сохраняются в простой массив writers, а в обработчике команд (handle_command()) мы различаем только два типа адресатов для ответных команд: назад к себе (to_self_command), и ко всем (to_all_command). Это самый примитивный подход и в одном из следующих примеров мы заменим его на перечисление индивидуальных адресатов по их индексам. Обработчик команд будет возвращать массив кортежей, каждый из которых в первом элементе будет содержать список индексов соединений, а во втором список команд, которые им предназначаются: result = [(indexes, commands), (indexes2, commands2)]. Соответственно, все соединения должны будут сохраняться не в списке writers, как раньше, а в словаре writer_by_index. Но пока вернемся к архитектуре приложения.
Разделение логики и инфраструктуры
Сейчас наше приложение цельное и неразделимое, а потому максимально "нереюзабельное". Другими словами, мы не можем использовать отдельные его части повторно, так как все они жестко связаны друг с другом. Хоть логика и вынесена в отдельную функцию handle_command(), но в handle_connection() вместо нее нельзя подставить другую. Если мы захотим написать новое приложение с другой логикой, то нам придется копировать и handle_connection(). Конечно, нас это не устраивает.В качестве быстрого решения можно передавать ссылку на функцию с логикой (handle_command()) в параметрах handle_connection(). Или можно вынести обе функции в класс, а в подклассах их переопределять. Но лучше всего логику (обработку команд) и инфраструктуру (обмен командами по сети) вообще реализовать в двух разных классах.
Преимущество классов перед отдельными функциями в том, что в них можно хранить собственный контекст из переменных-членов (атрибутов), а также группировать и переопределять в подклассах методы.
Разделение приложения на инфраструктуру (Server) и логику (Logic)
Разделение на два класса задает два магистральных пути развития в разработке сервера. С одной стороны мы можем подготовить разные типы и версии функционала по передаче объектов (Server), с другой — наделать кучу разных игр (Logic). И все это независимо друг от друга. Единственное условие, которое должно соблюдаться, это чтобы класс логики содержал метод с такой сигнатурой: handle_command(self, index, command):
class SocketServer:
def __init__(self, logic, host, port):
self.logic = logic
self.host = host
self.port = port
self.writers = []
def run(self):
asyncio.run(self.main())
async def main(self):
print(f"Start server: {self.host}:{self.port}")
server = await asyncio.start_server(handle_connection, host, port)
async with server:
await server.serve_forever()
async def handle_connection(self, reader, writer):
...
# Call:
# to_self_command, to_all_command = await self.logic.handle_command(index, command)
# instead of:
# to_self_command, to_all_command = await handle_command(index, command)
class MyLogic:
# global storage -> self.storage
def __init__(self):
self.storage = {}
async def handle_command(self, index, command):
...
Код запуска при этом немного изменится:
HOST, PORT = "", 5000
if __name__ == "__main__":
server = SocketServer(MyLogic(), HOST, PORT)
server.run()
Таким образом, сервер получает определенный класс логики и обращается к нему согласно интерфейсу. Связь эта односторонняя, так как логика ничего не знает об инфраструктуре.
Это был только первый шаг. Следующим будет — отделение функционала по транспортировке сообщений (сокеты) от способа их кодирования (JSON, YML, XML).
Разделение форматирования и передачи сообщений
Отделить логику приложения от инфраструктуры было хорошей идеей. В результате логика просто преобразуют одни команды в другие, попутно меняя свое состояние. При этом она не задумывается, откуда они берутся и куда деваются. За все это отвечает класс сервера (SocketServer). Вот сервером сейчас и займемся. Посмотрим, на какие части, в свою очередь, распадается он сам.Заглянув внутрь него, мы увидим, что определенный способ обработки сообщений (JSON) там жестко завязан на определенный способ их передачи (TCP-сокеты). И если мы захотим использовать другой формат передачи данных, то нам заодно придется скопировать кучу кода, к формату не относящегося.
Первое решение, которое приходит на ум — реализовать данные функции в разных методах (выделить методы parse() и serialize()):
class SocketServer:
def __init__(self, logic, host, port) -> None:
self.logic = logic
self.host = host
self.port = port
self.last_index = 0
self.writer_by_index = {}
async def handle_connection(self, reader, writer):
self.last_index += 1
index = self.last_index
self.writer_by_index[index] = writer
print(f"[SERVER#{index}] +Connected")
unparsed_bytes = b""
while True:
# Receive
try:
request_bytes = await reader.read(1024)
except ConnectionError:
break
if reader.at_eof():
print(f"[SERVER#{index}] EOF. Connection closed")
break
request_bytes = unparsed_bytes + request_bytes
# Handle
result, unparsed_bytes = await self.handle_bytes(index, request_bytes)
# Send response
await self.send(result)
print(f"[SERVER#{index}] -Disconnected")
del self.writer_by_index[index]
writer.close()
async def handle_bytes(self, index, request_bytes):
# Decode request
request, unparsed_bytes = self.parse(request_bytes)
# Make response
try:
# Parse request
command = json.loads(request)
# Process request
result = self.logic.handle_command(index, command)
except Exception as e:
print(f"[SERVER#{index}] Error while parsing or processing: {request} {traceback.format_exc()}")
result = [([index], [{"error": str(e)}])]
return result, unparsed_bytes
async def send(self, result):
if not result:
return
# Serialize
result = [(indexes, self.serialize(commands))
for indexes, commands in result]
# Send
wait_writers = []
for indexes, response_bytes in result:
for i in indexes:
writer = self.writer_by_index.get(i)
if writer:
try:
writer.write(response_bytes)
wait_writers.append(writer)
except ConnectionError:
continue
await asyncio.gather(writer.drain() for writer in wait_writers)
def parse(self, data_bytes):
request_bytes, unparsed_bytes = request_bytes.split(b"\x00")
request = request_bytes.decode("utf8")
return request, unparsed_bytes
def serialize(self, data):
return json.dumps(commands).encode("utf8") + b"\x00"
Теперь, например, если мы захотим изменить JSON на YML, то нам нужно всего лишь наследоваться от SocketServer и переопределить parse() и serialize(). Но при таком подходе остается все тот же недостаток при комбинировании разных функционалов. Для каждой комбинации придется создавать отдельный класс: JSONTCPSocketServer, YMLUDPSocketServer, JSONHTTPServer и так далее. Рассмотрим данный вопрос подробнее.
Код по доставке (сокеты) и код по форматированию (JSON) остается в одном классе. Это значит, что если нужно создать классы для двух типов сокетов (TCP и UDP) и трех видов форматов (JSON, YML, XML), то в итоге мы получим 2 * 3 = 6 классов для всех возможных комбинаций. Хотя должно быть по идее 2 + 3 = 5. Пусть 6 и 5 отличаются не сильно, но иметь в качестве закона возрастания кода умножение вместо сложения дает уже на следующем этапе избыточность в 33 % (3 * 3 = 9, 3 + 3 = 6). И то, что код при этом не дублируется — заслуга Python'а (множественное наследование), а не наша.
Выделение парсера как отдельного инфраструктурного слоя
Поэтому лучше поступить по-нормальному и разнести доставку и первичную обработку сообщений по разным классам. В данном случае вынести из класса сервера парсер:
class SocketServer:
def __init__(self, parser, logic, host, port) -> None:
self.parser = parser
self.logic = logic
self.host = host
self.port = port
self.last_index = 0
self.writer_by_index = {}
# ...
async def handle_bytes(self, index, request_bytes):
try:
# Parse
commands, unparsed_bytes = self.parser.parse(request_bytes)
# Handle
result = await self.logic.handle_commands(index, commands)
except Exception as e:
result = [([index], [{"error": str(e)}])]
unparsed_bytes = b""
return result, unparsed_bytes
async def send(self, result):
if not result:
return
# Serialize
result = [(indexes, self.parser.serialize(commands))
for indexes, commands in result]
# Send
wait_writers = []
for indexes, response_bytes in result:
for i in indexes:
writer = self.writer_by_index.get(i)
if writer:
try:
writer.write(response_bytes)
wait_writers.append(writer)
except ConnectionError:
continue
await asyncio.gather(writer.drain() for writer in wait_writers)
class Parser:
def parse(self, data_bytes):
return data_bytes, b""
def serialize(self, data):
return data
class JSONParser(Parser):
def parse(self, data_bytes):
# Get unparsed_bytes
data_bytes, unparsed_bytes = data_bytes.rsplit(b"\x00", 1)
# bytes -> list of str
data_str = data_bytes.decode("utf8")
message_list = data_str.split("\x00")
# Parse JSON commands (suppose, a command cannot be a list)
result = []
for message in message_list:
if not message:
continue
commands = json.loads(message)
if not commands:
continue
if isinstance(commands, list):
result.extend(commands)
else:
result.append(commands)
return result, unparsed_bytes
def serialize(self, data):
if not data:
return b""
data_str = json.dumps(data)
data_bytes = data_str.encode("utf8") + b"\x00"
return data_bytes
class MyLogic:
async def handle_commands(self, index, commands):
# Custom logic
result = []
for command in commands:
key = command.get("key")
code = command.get("code")
...
return result
HOST, PORT = "", 5000
if __name__ == "__main__":
server = SocketServer(JSONParser(), MyLogic(), HOST, PORT)
server.run()
Остановимся на кое-каких технических деталях.
Сервер может за раз принять одно сообщение, а может принять и несколько — в зависимости от того, сколько в потоке байтов присутствует разделителей. Сколько было в буфере, столько и возвращает. Да и в каждом сообщении, в принципе, можно отправлять сразу несколько команд вместо одной. Поэтому логично в парсере возвращать сразу список команд и условиться, что всегда возвращаться будет только список. Соответственно, и handle_command() будет принимать и возвращать команды только списками. Потому он и переименован теперь в handle_commands().
То, что мы передаем в парсер байты, а не декодированные в UTF-8 строки, позволяет нам реализовывать в них собственные кастомные бинарные протоколы. Разграничение сообщений нулевым символом (b"\x00") помещено в парсер по этой же причине. Если удалось распарсить команду, она возвращается. Если нет — возвращаются байты, чтобы позже к ним добавить новые и повторить попытку.
Как вы могли заметить, мы перешли от массовой адресации сообщений к точечной, о которой уже говорили выше. То есть вместо отсылки данных всем соединениям в списке writers, мы можем теперь отправлять команды по конкретным индексам соединений из словаря writer_by_index. Для этого handle_commands() к каждому списку команд добавляет еще список индексов соединений, которым они предназначены.
Чтобы по-прежнему можно было делать массовую отправку, нужно сохранять все индексы в хранилище при подключении, а при отключении — удалять их из списка. Поэтому в класс логики добавляется еще пара методов: on_connect() и on_disconnect():
class SocketServer:
# ...
async def handle_connection(self, reader, writer):
...
result = []
await self.logic.on_connect(index, result)
await self.send(result)
unparsed_bytes = b""
while True:
...
result = []
await self.logic.on_disconnect(index, result)
await self.send(result)
del self.writer_by_index[index]
writer.close()
class MyLogic:
def __init__(self):
self.storage = {}
async def on_connect(self, index, result):
indexes = self.storage.get("indexes")
if indexes is None:
self.storage["indexes"] = [index]
else:
indexes.append(index)
async def on_disconnect(self, index, result):
indexes = self.storage.get("indexes")
if indexes and index in indexes:
indexes.remove(index)
async def handle_commands(self, index, commands):
# Custom logic
result = []
all_indexes = self.storage.get("indexes")
for command in commands:
key = command.get("key")
code = command.get("code")
if code == "get":
state = storage.get(key)
result.append((all_indexes, [{"success": True, **command, "state": state}]))
elif code == "set":
state = command.get("state")
storage[key] = state
result.append(([index], [{"success": True, **command}]))
elif code == "update":
index = command.get("index")
value = command.get("value")
if not isinstance(index, int) or not isinstance(value, int):
result.append(([index], [{"success": False, **command}]))
continue
state = storage.get(key)
if state is None:
storage[key] = state = []
if index >= len(state):
state += [0] * (index - len(state) + 1)
state[index] = value
result.append((all_indexes, [{"success": True, **command}]))
return result
Разделение логики на контроллеры
Допустим далее, что нам нужно сделать серверы по игре в шахматы, шашки, крестики-нолики... Для каждого создается отдельный класс логики, где переопределяется лишь один метод handle_commands() — и никакого дублирования кода. Передаем в конструктор SocketServer первым аргументом объект логики, и сервер готов к использованию:class ChessLogic(MyLogic):
async def handle_commands(self, index, commands):
...
class CheckersLogic(MyLogic):
async def handle_commands(self, index, commands):
...
HOST, PORT = "", 5000
if __name__ == "__main__":
server = SocketServer(JSONParser(), CheckersLogic(), HOST, PORT)
server.run()
Но возможно, у вас уже возник закономерный вопрос. А что, если мы захотим создать сервер, где можно было бы по выбору играть и в шахматы, и в шашки? Вот тут уже придется изгаляться. Здесь нужен какой-то класс-диспетчер логики, который будет перенаправлять команды к соответствующему обработчику:
class ComboLogic:
def __init__(self) -> None:
self.chess = ChessLogic()
self.checkers = CheckersLogic()
# Use same storage for all
self.chess.storage = self.checkers.storage = self.storage = {}
async def on_connect(self, index, result):
...
async def on_disconnect(self, index, result):
...
async def handle_commands(self, index, commands):
# Custom logic
result = []
for command in commands:
key = command.get("key")
if key == "chess":
result.extend(await self.chess.handle_commands(index, commands))
elif key == "checkers":
result.extend(await self.checkers.handle_commands(index, commands))
return result
HOST, PORT = "", 5000
if __name__ == "__main__":
server = SocketServer(JSONParser(), ComboLogic(), HOST, PORT)
server.run()
Несложно заметить в цикле обработки команд явное дублирование кода (handle_commands() и extend()). Попробуем его устранить с помощью словаря:
class ComboLogic:
def __init__(self) -> None:
self.logic_by_key = {
"chess": ChessLogic(parser),
"checkers": CheckersLogic(parser),
}
self.storage = {}
# Provide common storage
for logic in self.logic_by_key.items():
logic.storage = self.storage
# ...
async def handle_commands(self, index, commands):
# Custom logic
result = []
for command in commands:
key = command.get("key")
logic = self.logic_by_key.get(key)
if logic:
result.extend(await logic.handle_commands(i, [command]))
return result
HOST, PORT = "", 5000
if __name__ == "__main__":
logic_by_key = {
"chess": ChessLogic(),
"checkers": CheckersLogic(),
}
server = SocketServer(JSONParser(), ComboLogic(logic_by_key), HOST, PORT)
server.run()
Сейчас класс логики выбирается по одному из свойств команды — key. Но впоследствии, когда будут реализованы комнаты (rooms) и перемещения игроков по ним, можно будет выбирать обработчик команды по тому, в какой комнате, в какой игре находится пользователь. Если он в покер-руме, по умолчанию берется логика покера, если за шахматным столом — логика шахмат.
Выделение из логики менеджера контроллеров (Application) как часть инфраструктуры
Налицо фактическое разделение логики на два типа классов: диспетчер (ComboLogic) и собственно реализации логики (ChessLogic, CheckersLogic). У последних из общего только интерфейс с сигнатурой метода handle_commands(). А диспетчер всегда один и тот же для всех приложений и никогда не меняется. Фактически он превратился в движок приложения, поэтому его уместнее будет переименовать в Engine, или лучше — в Application. А логику отдельных игр тогда — в контроллеры:
class Application:
def __init__(self, default_controller, controller_by_key=None) -> None:
self.default_controller = default_controller
self.controller_by_key = controller_by_key or {}
self.storage = {} # App state
async def on_connect(self, index, result):
if self.default_controller:
self.default_controller.on_connect(storage, index, result)
async def on_disconnect(self, index, result):
if self.default_controller:
self.default_controller.on_disconnect(storage, index, result)
async def handle_commands(self, index, commands):
result = []
# Handle
for command in commands:
key = command.get("key")
controller = self.controller_by_key.get(key, self.default_controller)
if controller:
await controller.handle_command(self.storage, index, command, result)
return result
class MyController:
# To be able to send commands to all current connections
async def on_connect(self, storage, index, result):
indexes = storage.get("indexes")
if indexes is None:
storage["indexes"] = [index]
else:
indexes.append(index)
async def on_disconnect(self, storage, index, result):
indexes = storage.get("indexes")
if indexes and index in indexes:
indexes.remove(index)
async def handle_command(self, storage, index, command, result):
...
class ChessController:
async def handle_command(self, storage, index, command, result):
...
class CheckersController:
async def handle_command(self, storage, index, command, result):
...
HOST, PORT = "", 5000
if __name__ == "__main__":
controller_by_key = {
"chess": ChessLogic(),
"checkers": CheckersLogic(),
}
app = Application(MyController(), controller_by_key)
server = SocketServer(JSONParser(), app, HOST, PORT)
server.run()
Содержимое методов on_connect() и on_disconnect() было вынесено из приложения в контроллер по умолчанию, так как данная логика весьма специфическая и может меняться от приложения к приложению. Мы не должны для этого переопределять класс Application.
Отметим также, что в контроллерах теперь не handle_commands(), а handle_command(). То есть команды обрабатываются по одной. Это удобнее, так как не нужно каждый раз делать обработку в цикле. И главное — диспетчер все равно будет передавать на обработку по одной команде, так как любая команда в массиве может требовать своего собственного обработчика.
Еще, результат больше не возвращается через return, а передается в виде списков в аргументах. Это тоже упрощает реализацию обработчиков.
И последнее. Так как контроллеры — это в сущности всего лишь часть общей логики приложения, то все они должны использовать одно общее состояние (storage). Сами контроллеры состояния не имеют и иметь не могут. Они — логика в самом чистом виде. Поэтому при каждом вызове handle_command() среди прочих аргументов передается и ссылка на состояние приложения.
Состояние
Скажем напоследок пару слов о состоянии и о том, как оно у нас получилось таким, каким получилось.В ООП подходе все само собой складывается так, что для каждой логической сущности создается программный объект с свойствами и методами. В свойствах хранится текущее состояние сущности, а в методах реализуются функции, которые это состояние изменяют.
При таком подходе все состояние приложения размазано тонким слоем по десяткам и сотням таких объектов. Чтобы сохранить состояние всего приложения в файл, придется обойти все объекты, собрать все их свойства и перевести в простые JSON-объекты. А чтобы загрузить, восстановить приложение из файла, придется воссоздать по иерархии JSON-объектов иерархию наших программных объектов, определить нужный класс для каждого, учитывая параметры конструкторов, и потом восстановить значения всех его свойств (даже приватных). В общем, ясно, что это очень и очень сложно и муторно.
Тут сначала может появиться идея, что все свойства объекта можно просто хранить в словаре. И не перебирать свойства объекта, когда его нужно сохранить, а просто отдавать этот словарь. Следующей мыслью возникает вопрос. А зачем нам вообще восстанавливать все эти объекты — их иерархию и внутреннее состояние? Почему не оперировать изначально чистой JSON-структурой? Тогда и объекты никакие нужны не будут, а будут одни функции. Простой набор функций.
По счастью, Python мультипарадигменный язык программирования, и на нем можно писать и в ООП-стиле, и в процедурном, и в функциональном. Мы начали с самой простой возможной реализации сервера — процедурной. Поэтому состояние у нас было изначально в отдельном словаре, общем на все приложение.
Когда мы перешли к ООП, мы сохранили использование централизованного состояния. Мы не стали его распределять по классам, потому что в этом не было никакого смысла. Классы мы применяли лишь для группировки функций и возможности подменять реализации некоторых из них в подклассах (см. паттерн шаблонный метод). (Если же мы сможем так организовать методы, чтобы вообще не менять в них состояние, то перейдем к математической концепции функций — к функциональному стилю.)
Повезло нам с состоянием? Не совсем. Все дело в методике разработки. Всегда нужно начинать с самого простой возможной версии, а потом добавлять в нее только то, без чего нельзя обойтись. Тогда про многие проблемы вы даже и не узнаете, что они бывают.
Репозиторий
В объектах команд помимо названия действия, которое нужно выполнить, также обычно указывается объекты, которые в этом участвуют. Например, чтобы передвинуть что-то на игровом поле, нужно явно определить, что вы будете двигать и куда. Если поле имеет декартову систему координат, то объекты можно указать через координаты. В противном случае придется использовать уникальные идентификаторы (id). Впрочем идентификаторы можно часто применять и параллельно с координатами (указывать или то, или другое).Также возможны команды, которые будут требовать обращения к свойствам объекта. Тогда поля в команде могут иметь следующий формат: "{id}.{property}". Таким же образом можно обращаться и к различным вложенным объектам, например: "id1.inner_id2.inner_id3".
Сейчас состояние реализовано простым классом dict. Поэтому в коде мы не можем просто вызывать: object = storage.get("id1").get("inner_id2").get("inner_id3"), так как какого-то промежуточного элемента может и не быть, и тогда возникнет исключение. Но и делать проверки для каждого id мы не можем, так как для этого придется добавить много "глупого" кода. Мы не хотим загромождать нашу логику разными дурацкими проверками, но и не добавить их тоже не можем. Поэтому в идеале для получения объекта должна вызываться только одна функция: object = storage.get("id1.inner_id2.inner_id3") (все проверки и прочая логика должны выполняться в ней автоматически). А для этого придется создать новый класс для хранилища:
class Storage:
def __init__(self):
self.storage = {}
def get(self, path):
return resolve_path(self.storage, path)
def set(self, path, value):
...
def update(self, path, value):
...
def delete(self, path):
...
def resolve_path(target, path=None):
if not path or target is None:
return target
current = target
keys = path.split(".")
for key in keys:
if isinstance(current, dict):
current = current.get(key)
else:
return None
return current
Аналогично get() будут выполнены и остальные методы: set(), update(), delete(). Если промежуточный вложенный объект отсутствует, то можно либо возвращать None, либо создавать пустой dict на его месте. Также можно добавить в get() значение по умолчанию (get(path, default=None)), которое будет установлено (set()), если get(path) возвращает None.
Когда у нас есть собственный класс вместо стандартного, нам становится проще добавлять в него новые функции. Например, хранилище можно синхронизировать с базой данных или делать периодическое автосохранение полностью всего состояния в файл, чтобы игру можно было восстановить при случайном падении сервера.
В Application, таким образом, можно подставлять разные реализации хранилища, лишь бы они использовали тот же интерфейс. Тогда можно выбирать нужную стратегию работы с данными (получение и хранение) без всяких изменений со стороны контроллеров. Бизнес-логика не будет даже подозревать откуда берутся данные и сохраняются ли они в БД или нет. Это не их забота.
Выделение хранилища (Repository) как отдельного слоя инфраструктуры
В DDD такой фасад для доступа к данным, который к тому же может поддерживать их целостность и актуальность, называется репозиторием (Repository). Поэтому мы вполне можем использовать и это название вместо Storage.
Сюда же можно добавить и настройки приложения. Чтобы не путать их с состоянием, добавим для них метод: getconfig(path). Все настройки приложения можно хранить в специальном файле (лучше всего для этого подходит формат YML) и загружать при его запуске. Поэтому методы setconfig() и updateconfig() не нужны.
Начальные состояния объектов также можно хранить в настройках, откуда оно будет копироваться при создании объекта состояния. На этот случай можно также создать отдельный метод:
class Repository:
# ...
def create(self, config_path=None, initial=None):
if self.state is None:
return None
# Get initial
config = self.getconfig(config_path, {}) if config_path else {}
initial = {**config, **initial}
id = initial.get("id")
if id is None:
# Generate id
...
# Set
return self.set(id, initial)
Так как теперь может существовать несколько вариаций хранилища, мы должны иметь возможность задать одну из них при инициализации приложения:
class Application:
def __init__(self, default_controller, controller_by_key=None, storage=None) -> None:
self.default_controller = default_controller
self.controller_by_key = controller_by_key or {}
self.storage = storage if storage else Repository() # App state
HOST, PORT = "", 5000
if __name__ == "__main__":
controller_by_key = {
"chess": ChessLogic(),
"checkers": CheckersLogic(),
}
app = Application(MyController(), controller_by_key, Repository())
server = SocketServer(JSONParser(), app, HOST, PORT)
server.run()
HTTP-сервер
Без сокет-сервера не обойтись, если игра многопользовательская, и нужно как можно быстрее оповещать всех участников о происходящий в приложении событиях. Но если мы строим ферму или наряжаем ёлочку, то нам достаточно простого HTTP-сервера, ведь все события мы генерируем сами на клиенте. Даже если что-то происходит само на сервере, то это происходит прогнозируемым образом. А значит, в одном из сообщений сервер даст знать клиенту, в какой момент тому нужно сделать запрос, чтобы проверить, не случилось ли чего. Давайте теперь посмотрим, насколько сильно изменится наша реализация при использовании HTTP-протокола.HTTP-сервер — это тоже сокет-сервер, но с тем отличием, что соединение разрывается сразу после отправки первого же ответного сообщения. Алгоритм его работы такой: сокет-соединение устанавливается, принимается запрос от клиента, он обрабатывается, и отсылается ответное сообщение на клиент. В конце каждой отправки соединение тут же закрывается.
Изолирование логики от инфраструктуры позволяет нам использовать логику повторно с любым типом серверов без всяких изменений в классах логики. Единственное, что будет меняться — это кое-какие слои инфраструктуры. В инфраструктуре первым делом изменяется способ транспортировки сообщений — слой Server.
HTTP — протокол стандартный и широко известный. Поэтому существует множество реализаций такого типа серверов, в том числе и на Python: Django, Twisted, Tornado. Мы выберем один из самых популярных и минималистичных — Flask. Принцип его работы можно проиллюстрировать следующим примером:
import json
from flask import Flask, send_file, request
app = Flask(__name__)
def handle(request):
return {}
@app.route("/storage/<key>")
def storage(key):
response = handle(request)
return response
Главная задача данных фреймворков — это преобразование строковых HTTP-сообщений в объекты request и response. Чтобы преобразовать эти объекты в привычные нам команды и обратно, создадим специальный FlaskParser. Допустим клиент использует RESTful API (это когда назначение запроса — взять, задать, изменить — определяется методом: get, post, patch):
class FlaskParser(Parser):
command_by_alias = {
"GET": "get",
"POST": "save",
"PATCH": "update",
}
def parse(self, request):
# Parse
values = request.values
data_str = values.get("data")
data = json.loads(data_str) if data_str else None
if data is None:
data = {}
# Prepare command
code = data.get("code")
if not code:
code = values.get("_method") or request.method
data["code"] = self.command_by_alias.get(code, code)
data["key"] = request.view_args.get("key")
return data, b""
# No real serialization needed here
def serialize(self, command):
return command
Также, поскольку HTTP-сервер запускается зачастую в нескольких процессах, которые все должны разделять общее состояние, то нам придется использовать репозиторий, хранящий данные не в памяти, а в какой-нибудь БД.
Итого, изменяется всего три слоя: сервер, парсер и репозиторий. С репозиторием ничего нового — синхронизировать данные с БД часто бывает нужно и в сокет-серверах. Класс сервера можно также использовать старый, так как там нам нужен только метод handle_bytes(). Для унификации его можно переименовать в более абстрактный handle_requests() и вынести весь код, кроме handle_connection() в базовый класс Server. В конце концов действительно уникальным для HTTP-сервера классом будет только парсер:
import json
from flask import Flask, send_file, request
app = Flask(__name__)
controller_by_key = {
"chess": ChessLogic(),
"checkers": CheckersLogic(),
}
application = Application(MyController(), controller_by_key, DBRepository())
server = Server(FlaskParser(), application)
@app.route("/storage/<key>")
async def storage(key):
return await server.handle_bytes(key, request)[0]
Одно из испытаний на универсальность наша схема успешно выдержала.
Выводы
Разбиение всего приложения на несколько независимых друг от друга слоев позволяет классы каждого из них разрабатывать отдельно от классов других слоев. Все, что от них требуется — это держаться в рамках заданных для них интерфейсов. Если интерфейсы остаются неприкосновенными, то любые изменения внутри слоя никак не отразятся на остальных. В этом и заключается вся прелесть слоистой архитектуры.Четыре слоя инфраструктуры и их отношение к логике (Controller)
При разбиении цельного приложения на слои мы сначала выделили две основные логические части: инфраструктуру и бизнес-логику. Первая в последствии разделилась на Server и Parser. А из второй отделились еще два инфраструктурных слоя: Application и Repository. В результате данные в программе обрабатываются по следующей цепочке:
Инфраструктура — это все то, что не относится напрямую к логике, но помогает ей выполнять свои задачи. Будучи общей для самых разных задач, ее можно вынести в основную библиотеку классов. А поскольку эти классы также задают всю структуру приложения, составляют его каркас, то его с полным правом можно назвать фреймворком.Server → Parser → Application → Controller → Repository
Отделив все вспомогательные функции и вынеся их в специальный инфраструктурный фреймворк, мы получили бизнес-логику в чистом виде — в виде контроллеров. Настолько чистом, что они не зависят даже от самих себя (т.е. друг от друга). О том, как писать бизнес-логику правильно, и на какие слои разбивается она сама, можно узнать в следующей, заключительной статье.
Исходники
Эволюция игрового фреймворка. Сервер на Python. Часть 1. Слои инфраструктуры
Допустим, у нас большие планы, и мы хотим реализовать серверную часть для всех основных игровых жанров. Однако, прежде, чем приступить к этому, нужно хорошенько подготовиться. Нужно создать такую...
habr.com