Кратко про Nameko Python

Kate

Administrator
Команда форума
Nameko, как фреймворк для Python, предлагает удобные абстракции и инструменты для создания микросервисов, ориентированных на сообщения и события.

С Nameko можно получить не только инструмент для создания микросервисов, но и платформу для реализации распределенных систем с поддержкой Dependency Injection, позволяющей легко интегрировать различные компоненты приложения.

Установим через пип:

pip install nameko
Это установит Nameko и все зависимости.

После установки проверим:

nameko --version
Если выдало версию - все ок.

Встроенные расширения​

AMQP — это протокол обмена сообщениями, который Nameko использует для межсервисного взаимодействия. Встроенное расширение AMQP позволяет настроить и использовать очереди сообщений для связи между сервисами:

from nameko.messaging import Publisher, rpc

class ServiceA:
name = "service_a"

publisher = Publisher()

@rpc
def send_message(self, message):
self.publisher.publish(message)

class ServiceB:
name = "service_b"

@rpc
def process_message(self, message):
print("Received message:", message)
ServiceA отправляет сообщение через очередь, а ServiceB принимает и обрабатывает его.

DependencyProvider позволяет инжектировать зависимости в ваши сервисы, обеспечивая более чистый и модульный код. Создадим сервис, который использует БД:

from nameko.dependency_providers import DependencyProvider

class DatabaseWrapper:
def __init__(self, db_url):
self.db_url = db_url

def execute_query(self, query):
# Execute query logic here
pass

class DatabaseProvider(DependencyProvider):
def setup(self):
self.database = DatabaseWrapper(self.container.config['DATABASE_URL'])

def get_dependency(self, worker_ctx):
return self.database

class MyService:
name = "my_service"

db = DatabaseProvider()

def some_method(self):
result = self.db.execute_query("SELECT * FROM table")
return result
Юзаем DatabaseProvider для предоставления экземпляра DatabaseWrapper в сервис.

Встроенное расширение HTTP позволяет создавать веб-сервисы на базе Nameko, обеспечивая взаимодействие с внешними клиентами через HTTP. Например:

from nameko.web.handlers import http

class HttpService:
name = "http_service"

@http('GET', '/hello')
def hello_world(self, request):
return 200, "Hello, World!"
RPC также нашло свою реализацию в nameko, пример:

from nameko.rpc import rpc

class GreetingService:
name = "greeting_service"

@rpc
def greet(self, name):
return f"Hello, {name}!"

GreetingService предоставляет метод greet, который может быть вызван удаленно другими сервисами.

Events позволяет сервисам подписываться на события и реагировать на них:

from nameko.events import event_handler

class NotificationService:
name = "notification_service"

@event_handler("email_service", "email_sent")
def handle_email_sent(self, payload):
print("Email sent:", payload)
NotificationService подписывается на событие email_sent от сервиса email_service и обрабатывает его.

Timer позволяет создавать периодические задачи в приложении:

from nameko.timer import timer

class CleanupService:
name = "cleanup_service"

@timer(interval=60) # Выполнять каждую минуту
def cleanup(self):
print("Performing cleanup...")
# Логика очистки данных
Сервис будет выполнять метод cleanup каждую минуту.

Что там с тестами?​

Для написания unit-тестов в Nameko можно использовать стандартные фреймворки для тестирования Python, какpytest или unittest. Для этого создаем экземпляр сервиса в тестовой среде и вызываем его методы для проверки их поведения. Предположим, у нас есть простой сервис для работы с мат. операциями:

# math_service.py
from nameko.rpc import rpc

class MathService:
name = "math_service"

@rpc
def add(self, x, y):
return x + y
Напишем unit-тест для этого сервиса с использованием pytest:

# test_math_service.py
from math_service import MathService

def test_add():
service = MathService()
assert service.add(2, 3) == 5
Интеграционные тесты в Nameko позволяют проверить взаимодействие между различными сервисами и убедиться, что они взаимодействуют правильно в рамках системы. Для этого можно использовать механизм запуска сервисов в тестовом режиме и проверять их взаимодействие через RPC вызовы или события. Пример интеграционного теста с использованием pytest:

# test_order_service.py
from nameko.testing.services import entrypoint_hook

from order_service import OrderService
from product_service import ProductService

def test_create_order():
# создаем экземпляр сервиса OrderService
order_service = OrderService()

# создаем экземпляр сервиса ProductService
product_service = ProductService()

# входим в контекст сервиса OrderService
with entrypoint_hook(order_service, "create_order") as create_order:
# вызываем метод create_order
order_id = create_order(user_id=1, product_id=1, quantity=2)

# проверяем, что заказ успешно создан
assert order_id is not None
Иногда может потребоваться использовать mock объекты для имитации зависимостей в тестах. В Nameko это можно реализовать с помощью entrypoint механизма:

# test_email_service.py
from nameko.testing.services import entrypoint_hook
from unittest.mock import Mock

from email_service import EmailService

def test_send_email():
# создаем экземпляр сервиса EmailService
email_service = EmailService()

# создаем mock объект для отправки email
mock_send_email = Mock()

# подменяем реальный метод отправки email на mock объект
email_service.send_email = mock_send_email

# входим в контекст сервиса EmailService
with entrypoint_hook(email_service, "send_email") as send_email:
# вызываем метод send_email
send_email(recipient="test@example.com", subject="Test", body="This is a test email")

# проверяем, что метод send_email был вызван с правильными аргументами
mock_send_email.assert_called_once_with("test@example.com", "Test", "This is a test email")

Написание расширений​

Путем написания расширений можнонастраивать поведение Nameko под конкретные потребности.

Допустим, нужно добавить механизм аутентификации к сервисам Nameko. Можно написать расширение, которое будет обрабатывать проверку подлинности запросов к сервисам:

from nameko.extensions import Extension

class AuthExtension(Extension):
def __init__(self, auth_service):
self.auth_service = auth_service

def bind(self, container):
super(AuthExtension, self).bind(container)
self.container.auth = self

def get_dependency(self, worker_ctx):
return AuthServiceWrapper(self.auth_service)

class AuthServiceWrapper:
def __init__(self, auth_service):
self.auth_service = auth_service

def authenticate(self, token):
# логика аутентификации
pass
Или к примеру хотим добавить мониторинг производительности в сервисы Nameko. Можно написать расширение, которое будет собирать статистику о времени выполнения запросов и других метриках:

from nameko.extensions import Extension

class MonitoringExtension(Extension):
def __init__(self, statsd_client):
self.statsd_client = statsd_client

def bind(self, container):
super(MonitoringExtension, self).bind(container)
self.container.monitoring = self

def record_request_time(self, service_name, method_name, time_taken):
metric_name = f"{service_name}.{method_name}.request_time"
self.statsd_client.timing(metric_name, time_taken)
После написания расширения, можно использовать его в своих сервисах. Для этого просто добавляем его в контейнер Nameko:

from nameko.rpc import rpc
from my_extensions import AuthExtension

class MyService:
name = "my_service"

auth = AuthExtension()

@rpc
def my_method(self, request):
# проверка аутентификации
self.auth.authenticate(request.token)
# логика метода


 
Сверху