Source code for harp_apps.proxy.controllers

import time
from datetime import UTC, datetime
from functools import cached_property
from typing import Optional
from urllib.parse import urlencode, urljoin, urlparse

import httpx
from hishel._headers import parse_cache_control
from httpx import AsyncClient, codes
from whistle import IAsyncEventDispatcher

from harp import __parsed_version__, get_logger
from harp.asgi.events import MessageEvent, TransactionEvent
from harp.http import BaseHttpMessage, HttpError, HttpRequest, HttpResponse
from harp.http.requests import WrappedHttpRequest
from harp.models import Transaction
from harp.settings import USE_PROMETHEUS
from harp.utils.guids import generate_transaction_id_ksuid
from harp.utils.tpdex import tpdex

from .events import EVENT_TRANSACTION_ENDED, EVENT_TRANSACTION_MESSAGE, EVENT_TRANSACTION_STARTED

logger = get_logger(__name__)

_prometheus = None
if USE_PROMETHEUS:
    from prometheus_client import Counter, Histogram

    _prometheus = {
        "call": Counter("proxy_calls", "Requests to the proxy.", ["name", "method"]),
        "time.full": Histogram("proxy_time_full", "Requests to the proxy including overhead.", ["name", "method"]),
        "time.forward": Histogram("proxy_time_forward", "Forward time.", ["name", "method"]),
    }


[docs] class HttpProxyController: name: str = None """Controller name, also refered as endpoint name (for example in :class:`Transaction <harp.models.Transaction>`).""" user_agent: str = None """User agent to use when proxying requests (will default to harp/<version>).""" _dispatcher: Optional[IAsyncEventDispatcher] = None """Event dispatcher for this controller.""" url: str """Base URL to proxy requests to.""" @cached_property def dispatcher(self): """Read-only reference to the event dispatcher.""" return self._dispatcher
[docs] def __init__(self, url, *, http_client: AsyncClient, dispatcher=None, name=None, logging=True): self.http_client = http_client self.url = url or self.url self.name = name or self.name self._logging = logging self._dispatcher = dispatcher or self._dispatcher self.parsed_url = urlparse(self.url) # we only expose minimal information about the exact version if not self.user_agent: try: self.user_agent = f"harp/{__parsed_version__.major}.{__parsed_version__.minor}" except AttributeError: self.user_agent = "harp"
[docs] async def adispatch(self, event_id, event=None): """ Shortcut method to dispatch an event using the controller's dispatcher, if there is one. :return: :class:`IEvent <whistle.IEvent>` or None """ if self._dispatcher: return await self._dispatcher.adispatch(event_id, event)
[docs] def debug(self, message, *args, **kwargs): if not self._logging: return transaction: Transaction | None = kwargs.pop("transaction", None) if transaction: kwargs["transaction"] = transaction.id kwargs.update(transaction.extras) logger.debug(message, *args, **kwargs)
async def __call__(self, request: HttpRequest): """Handle an incoming request and proxy it to the configured URL. :param request: ASGI request :param response: ASGI response :return: """ prometheus_labels = (self.name or "-", request.method) before_time = time.perf_counter() if _prometheus: _prometheus["call"].labels(*prometheus_labels).inc() # create an envelope to override things, without touching the original request request = WrappedHttpRequest(request) request.headers["host"] = self.parsed_url.netloc # override user agent (later, may be customizable behavior) if self.user_agent: request.headers["user-agent"] = self.user_agent # create transaction transaction = await self._create_transaction_from_request( request, tags=self._extract_tags_from_request(request), ) await request.join() url = urljoin(self.url, request.path) + (f"?{urlencode(request.query)}" if request.query else "") # PROXY REQUEST p_request: httpx.Request = self.http_client.build_request( request.method, url, headers=list(request.headers.items()), content=request.body ) self.debug(f"▶▶ {request.method} {url}", transaction=transaction) # PROXY RESPONSE before_forward_time = time.perf_counter() try: p_response: httpx.Response = await self.http_client.send(p_request) except httpx.ConnectError as exc: if _prometheus: _prometheus["time.forward"].labels(*prometheus_labels).observe( time.perf_counter() - before_forward_time ) transaction.extras["status_class"] = "ERR" await self.end_transaction(transaction, HttpError("Unavailable", exception=exc)) # todo add web debug information if we are not on a production env return HttpResponse( "Service Unavailable (remote server unavailable)", status=503, content_type="text/plain" ) except httpx.TimeoutException as exc: if _prometheus: _prometheus["time.forward"].labels(*prometheus_labels).observe( time.perf_counter() - before_forward_time ) transaction.extras["status_class"] = "ERR" await self.end_transaction(transaction, HttpError("Timeout", exception=exc)) # todo add web debug information if we are not on a production env return HttpResponse("Gateway Timeout (remote server timeout)", status=504, content_type="text/plain") if _prometheus: _prometheus["time.forward"].labels(*prometheus_labels).observe(time.perf_counter() - before_forward_time) self.debug( f"◀◀ {p_response.status_code} {p_response.reason_phrase} ({p_response.elapsed.total_seconds()}s)", transaction=transaction, ) response_status = p_response.status_code response_headers = { k: v for k, v in p_response.headers.multi_items() if k.lower() not in ("server", "date", "content-encoding", "content-length") } # XXX for now, we use transaction "extras" to store searchable data for later transaction.extras["status_class"] = f"{response_status // 100}xx" if p_response.extensions.get("from_cache"): transaction.extras["cached"] = p_response.extensions.get("cache_metadata", {}).get("cache_key", True) response = HttpResponse(p_response.content, status=response_status, headers=response_headers) await self.end_transaction(transaction, response) if _prometheus: _prometheus["time.full"].labels(*prometheus_labels).observe( time.perf_counter() - before_time, ) return response
[docs] async def end_transaction(self, transaction, response: BaseHttpMessage): spent = int((datetime.now(UTC).timestamp() - transaction.started_at.timestamp()) * 100000) / 100 transaction.finished_at = datetime.now(UTC) transaction.elapsed = spent if isinstance(response, HttpError): logger.error(f"◀ {type(response).__name__} {response.message} ({spent}ms)", transaction_id=transaction.id) elif isinstance(response, HttpResponse): reason = codes.get_reason_phrase(response.status) self.debug(f"◀ {response.status} {reason} ({spent}ms)", transaction=transaction) else: raise ValueError(f"Invalid final message type: {type(response)}") if transaction.extras.get("status_class") == "ERR": transaction.tpdex = 0 else: transaction.tpdex = tpdex(transaction.elapsed) # dispatch message event for response # TODO delay after response is sent ? await self.adispatch(EVENT_TRANSACTION_MESSAGE, MessageEvent(transaction, response)) # dispatch transaction ended event # TODO delay after response is sent ? await self.adispatch(EVENT_TRANSACTION_ENDED, TransactionEvent(transaction))
async def _create_transaction_from_request(self, request: HttpRequest, *, tags=None): transaction = Transaction( id=generate_transaction_id_ksuid(), type="http", started_at=datetime.now(UTC), endpoint=self.name, tags=tags, ) request_cache_control = request.headers.get("cache-control") if request_cache_control: request_cache_control = parse_cache_control([request_cache_control]) self.debug(f"Cache-Control: {request_cache_control}", transaction=transaction) if request_cache_control.no_cache: transaction.extras["no_cache"] = True # XXX for now, we use transaction "extras" to store searchable data for later transaction.extras["method"] = request.method self.debug(f"▶ {request.method} {request.path}", transaction=transaction) # dispatch transaction started event # we don't really want to await this, should run in background ? or use an async queue ? await self.adispatch(EVENT_TRANSACTION_STARTED, TransactionEvent(transaction)) # dispatch message event for request await self.adispatch(EVENT_TRANSACTION_MESSAGE, MessageEvent(transaction, request)) return transaction def _extract_tags_from_request(self, request: WrappedHttpRequest): """ Convert special request headers (x-harp-*) into tags (key-value pairs) that we'll attach to the transaction. Headers are "consumed", meaning they are removed from the request headers. """ tags = {} for header in request.headers: header = header.lower() if header.startswith("x-harp-"): tags[header[7:]] = request.headers.pop(header) return tags def __repr__(self): return f"{type(self).__name__}({self.url!r}, name={self.name!r})"