Advertisement
hasthanayake

http2.py

Apr 8th, 2021
977
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 34.20 KB | None | 0 0
  1. import threading
  2. import time
  3. import functools
  4. import types
  5. from typing import Dict, Callable, Any, List, Optional  # noqa
  6.  
  7. import h2.exceptions
  8. from h2 import connection
  9. from h2 import events
  10. import queue
  11.  
  12. from mitmproxy import connections, flow  # noqa
  13. from mitmproxy import exceptions
  14. from mitmproxy import http
  15. from mitmproxy.proxy.protocol import base
  16. from mitmproxy.proxy.protocol import http as httpbase
  17. import mitmproxy.net.http
  18. from mitmproxy.net import tcp
  19. from mitmproxy.coretypes import basethread
  20. from mitmproxy.net.http import http2, headers, url
  21. from mitmproxy.utils import human
  22.  
  23.  
  24. class SafeH2Connection(connection.H2Connection):
  25.  
  26.     def __init__(self, conn, *args, **kwargs):
  27.         super().__init__(*args, **kwargs)
  28.         self.conn = conn
  29.         self.lock = threading.RLock()
  30.  
  31.     def safe_acknowledge_received_data(self, acknowledged_size: int, stream_id: int):
  32.         if acknowledged_size == 0:
  33.             return
  34.  
  35.         with self.lock:
  36.             self.acknowledge_received_data(acknowledged_size, stream_id)
  37.             self.conn.send(self.data_to_send())
  38.  
  39.     def safe_reset_stream(self, stream_id: int, error_code: int):
  40.         with self.lock:
  41.             try:
  42.                 self.reset_stream(stream_id, error_code)
  43.             except h2.exceptions.StreamClosedError:  # pragma: no cover
  44.                 # stream is already closed - good
  45.                 pass
  46.             self.conn.send(self.data_to_send())
  47.  
  48.     def safe_update_settings(self, new_settings: Dict[int, Any]):
  49.         with self.lock:
  50.             self.update_settings(new_settings)
  51.             self.conn.send(self.data_to_send())
  52.  
  53.     def safe_send_headers(self, raise_zombie: Callable, stream_id: int, headers: headers.Headers, **kwargs):
  54.         with self.lock:
  55.             raise_zombie()
  56.             self.send_headers(stream_id, headers.fields, **kwargs)
  57.             self.conn.send(self.data_to_send())
  58.  
  59.     def safe_send_body(self, raise_zombie: Callable, stream_id: int, chunks: List[bytes], end_stream=True):
  60.         for chunk in chunks:
  61.             position = 0
  62.             while position < len(chunk):
  63.                 self.lock.acquire()
  64.                 raise_zombie(self.lock.release)
  65.                 max_outbound_frame_size = self.max_outbound_frame_size
  66.                 frame_chunk = chunk[position:position + max_outbound_frame_size]
  67.                 if self.local_flow_control_window(stream_id) < len(frame_chunk):  # pragma: no cover
  68.                     self.lock.release()
  69.                     time.sleep(0.1)
  70.                     continue
  71.                 self.send_data(stream_id, frame_chunk)
  72.                 try:
  73.                     self.conn.send(self.data_to_send())
  74.                 except Exception as e:  # pragma: no cover
  75.                     raise e
  76.                 finally:
  77.                     self.lock.release()
  78.                 position += max_outbound_frame_size
  79.         if end_stream:
  80.             with self.lock:
  81.                 raise_zombie()
  82.                 self.end_stream(stream_id)
  83.                 self.conn.send(self.data_to_send())
  84.  
  85.  
  86. class Http2Layer(base.Layer):
  87.  
  88.     if False:
  89.         # mypy type hints
  90.         client_conn: connections.ClientConnection = None
  91.  
  92.     class H2ConnLogger:
  93.         def __init__(self, name, log):
  94.             self.name = name
  95.             self.log = log
  96.  
  97.         def debug(self, fmtstr, *args):
  98.             # self.log("H2Conn {}: {}".format(self.name, fmtstr % args)+"BH - break point 98 of ### http2.py ## ", "info")
  99.             msg = "H2Conn {}: {}".format(self.name, fmtstr % args)
  100.             self.log(msg, "debug")
  101.             # self.log("BH - break point 99 of #################### http2.py &&&&&&&&&&########## ", "info")
  102.  
  103.         def trace(self, fmtstr, *args):
  104.             pass
  105.  
  106.     def __init__(self, ctx, mode: str) -> None:
  107.         super().__init__(ctx)
  108.         self.mode = mode
  109.         self.streams: Dict[int, Http2SingleStreamLayer] = dict()
  110.         self.server_to_client_stream_ids: Dict[int, int] = dict([(0, 0)])
  111.         self.connections: Dict[object, SafeH2Connection] = {}
  112.  
  113.         config = h2.config.H2Configuration(
  114.             client_side=False,
  115.             header_encoding=False,
  116.             validate_outbound_headers=False,
  117.             validate_inbound_headers=False,
  118.             logger=self.H2ConnLogger("client", self.log))
  119.         self.connections[self.client_conn] = SafeH2Connection(self.client_conn, config=config)
  120.  
  121.     def _initiate_server_conn(self):
  122.         if self.server_conn.connected():
  123.             config = h2.config.H2Configuration(
  124.                 client_side=True,
  125.                 header_encoding=False,
  126.                 validate_outbound_headers=False,
  127.                 validate_inbound_headers=False,
  128.                 logger=self.H2ConnLogger("server", self.log))
  129.             self.connections[self.server_conn] = SafeH2Connection(self.server_conn, config=config)
  130.         self.connections[self.server_conn].initiate_connection()
  131.         self.server_conn.send(self.connections[self.server_conn].data_to_send())
  132.  
  133.     def _complete_handshake(self):
  134.         preamble = self.client_conn.rfile.read(24)
  135.         self.connections[self.client_conn].initiate_connection()
  136.         self.connections[self.client_conn].receive_data(preamble)
  137.         self.client_conn.send(self.connections[self.client_conn].data_to_send())
  138.  
  139.     def next_layer(self):  # pragma: no cover
  140.         # WebSocket over HTTP/2?
  141.         # CONNECT for proxying?
  142.         raise NotImplementedError()
  143.  
  144.     def _handle_event(self, event, source_conn, other_conn, is_server):
  145.         self.log("BH - break point 145 of #################### http2.py &&&&&&&&&&########## ", "info")
  146.         self.log(
  147.             "HTTP2 Event from {}".format("server" if is_server else "client"),
  148.             "debug",
  149.             [repr(event)]
  150.         )
  151.         self.log("BH - break point 151 of #################### http2.py &&&&&&&&&&########## ", "info")
  152.  
  153.         eid = None
  154.         if hasattr(event, 'stream_id'):
  155.             if is_server and event.stream_id % 2 == 1:
  156.                 eid = self.server_to_client_stream_ids[event.stream_id]
  157.             else:
  158.                 eid = event.stream_id
  159.         self.log("BH - break point 158 of #################### http2.py &&&&&&&&&&########## ", "info")
  160.         if isinstance(event, events.RequestReceived):
  161.             self.log("BH - break point 160 of #################### http2.py &&&&&&&&&&########## ", "info")
  162.             return self._handle_request_received(eid, event)
  163.         elif isinstance(event, events.ResponseReceived):
  164.             self.log("BH - break point 163 of #################### http2.py &&&&&&&&&&########## ", "info")
  165.             return self._handle_response_received(eid, event)
  166.         elif isinstance(event, events.DataReceived):
  167.             self.log("BH - break point 166 of #################### http2.py &&&&&&&&&&########## ", "info")
  168.             return self._handle_data_received(eid, event, source_conn)
  169.         elif isinstance(event, events.StreamEnded):
  170.             self.log("BH - break point 169 of #################### http2.py &&&&&&&&&&########## ", "info")
  171.             return self._handle_stream_ended(eid)
  172.         elif isinstance(event, events.StreamReset):
  173.             self.log("BH - break point 172 of #################### http2.py &&&&&&&&&&########## ", "info")
  174.             return self._handle_stream_reset(eid, event, is_server, other_conn)
  175.         elif isinstance(event, events.RemoteSettingsChanged):
  176.             self.log("BH - break point 175 of #################### http2.py &&&&&&&&&&########## ", "info")
  177.             return self._handle_remote_settings_changed(event, other_conn)
  178.         elif isinstance(event, events.ConnectionTerminated):
  179.             self.log("BH - break point 178 of #################### http2.py &&&&&&&&&&########## ", "info")
  180.             return self._handle_connection_terminated(event, is_server)
  181.         elif isinstance(event, events.PushedStreamReceived):
  182.             self.log("BH - break point 181 of #################### http2.py &&&&&&&&&&########## ", "info")
  183.             return self._handle_pushed_stream_received(event)
  184.         elif isinstance(event, events.PriorityUpdated):
  185.             self.log("BH - break point 184 of #################### http2.py &&&&&&&&&&########## ", "info")
  186.             return self._handle_priority_updated(eid, event)
  187.         elif isinstance(event, events.TrailersReceived):
  188.             self.log("BH - break point 187 of #################### http2.py &&&&&&&&&&########## ", "info")
  189.             return self._handle_trailers(eid, event, is_server, other_conn)
  190.         self.log("BH - break point 189 of #################### http2.py &&&&&&&&&&########## ", "info")
  191.         # fail-safe for unhandled events
  192.         return True
  193.  
  194.     def _handle_request_received(self, eid, event):
  195.         headers = mitmproxy.net.http.Headers([[k, v] for k, v in event.headers])
  196.         self.streams[eid] = Http2SingleStreamLayer(self, self.connections[self.client_conn], eid, headers)
  197.         self.streams[eid].timestamp_start = time.time()
  198.         if event.priority_updated is not None:
  199.             self.streams[eid].priority_exclusive = event.priority_updated.exclusive
  200.             self.streams[eid].priority_depends_on = event.priority_updated.depends_on
  201.             self.streams[eid].priority_weight = event.priority_updated.weight
  202.             self.streams[eid].handled_priority_event = event.priority_updated
  203.         self.streams[eid].start()
  204.         self.streams[eid].request_message.arrived.set()
  205.         return True
  206.  
  207.     def _handle_response_received(self, eid, event):
  208.         headers = mitmproxy.net.http.Headers([[k, v] for k, v in event.headers])
  209.         self.streams[eid].queued_data_length = 0
  210.         self.streams[eid].timestamp_start = time.time()
  211.         self.streams[eid].response_message.headers = headers
  212.         self.streams[eid].response_message.arrived.set()
  213.         return True
  214.  
  215.     def _handle_data_received(self, eid, event, source_conn):
  216.         bsl = human.parse_size(self.config.options.body_size_limit)
  217.         if bsl and self.streams[eid].queued_data_length > bsl:
  218.             self.streams[eid].kill()
  219.             self.connections[source_conn].safe_reset_stream(
  220.                 event.stream_id,
  221.                 h2.errors.ErrorCodes.REFUSED_STREAM
  222.             )
  223.             self.log(f"HTTP body too large. Limit is {bsl}.", "info")
  224.         else:
  225.             self.streams[eid].data_queue.put(event.data)
  226.             self.streams[eid].queued_data_length += len(event.data)
  227.  
  228.         # always acknowledge receved data with a WINDOW_UPDATE frame
  229.         self.connections[source_conn].safe_acknowledge_received_data(
  230.             event.flow_controlled_length,
  231.             event.stream_id
  232.         )
  233.         return True
  234.  
  235.     def _handle_stream_ended(self, eid):
  236.         self.log("BH - break point 235 of #################### http2.py &&&&&&&&&&########## ", "info")
  237.         self.streams[eid].timestamp_end = time.time()
  238.         self.log("BH - break point 237 of #################### http2.py &&&&&&&&&&########## ", "info")
  239.         self.streams[eid].stream_ended.set()
  240.         self.log("BH - break point 239 of #################### http2.py &&&&&&&&&&########## ", "info")
  241.         return True
  242.  
  243.     def _handle_stream_reset(self, eid, event, is_server, other_conn):
  244.         if eid in self.streams:
  245.             self.streams[eid].kill()
  246.             if is_server:
  247.                 other_stream_id = self.streams[eid].client_stream_id
  248.             else:
  249.                 other_stream_id = self.streams[eid].server_stream_id
  250.             if other_stream_id is not None:
  251.                 self.connections[other_conn].safe_reset_stream(other_stream_id, event.error_code)
  252.         return True
  253.  
  254.     def _handle_trailers(self, eid, event, is_server, other_conn):
  255.         trailers = mitmproxy.net.http.Headers([[k, v] for k, v in event.headers])
  256.         self.streams[eid].trailers = trailers
  257.         return True
  258.  
  259.     def _handle_remote_settings_changed(self, event, other_conn):
  260.         new_settings = {key: cs.new_value for (key, cs) in event.changed_settings.items()}
  261.         self.connections[other_conn].safe_update_settings(new_settings)
  262.         return True
  263.  
  264.     def _handle_connection_terminated(self, event, is_server):
  265.         self.log("HTTP/2 connection terminated by {}: error code: {}, last stream id: {}, additional data: {}".format(
  266.             "server" if is_server else "client",
  267.             event.error_code,
  268.             event.last_stream_id,
  269.             event.additional_data), "info")
  270.  
  271.         if event.error_code != h2.errors.ErrorCodes.NO_ERROR:
  272.             # Something terrible has happened - kill everything!
  273.             self.connections[self.client_conn].close_connection(
  274.                 error_code=event.error_code,
  275.                 last_stream_id=event.last_stream_id,
  276.                 additional_data=event.additional_data
  277.             )
  278.             self.client_conn.send(self.connections[self.client_conn].data_to_send())
  279.             self._kill_all_streams()
  280.         else:
  281.             """
  282.            Do not immediately terminate the other connection.
  283.            Some streams might be still sending data to the client.
  284.            """
  285.         return False
  286.  
  287.     def _handle_pushed_stream_received(self, event):
  288.         # pushed stream ids should be unique and not dependent on race conditions
  289.         # only the parent stream id must be looked up first
  290.  
  291.         parent_eid = self.server_to_client_stream_ids[event.parent_stream_id]
  292.         with self.connections[self.client_conn].lock:
  293.             self.connections[self.client_conn].push_stream(parent_eid, event.pushed_stream_id, event.headers)
  294.             self.client_conn.send(self.connections[self.client_conn].data_to_send())
  295.  
  296.         headers = mitmproxy.net.http.Headers([[k, v] for k, v in event.headers])
  297.         layer = Http2SingleStreamLayer(self, self.connections[self.client_conn], event.pushed_stream_id, headers)
  298.         self.streams[event.pushed_stream_id] = layer
  299.         self.streams[event.pushed_stream_id].timestamp_start = time.time()
  300.         self.streams[event.pushed_stream_id].pushed = True
  301.         self.streams[event.pushed_stream_id].parent_stream_id = parent_eid
  302.         self.streams[event.pushed_stream_id].timestamp_end = time.time()
  303.         self.streams[event.pushed_stream_id].request_message.arrived.set()
  304.         self.streams[event.pushed_stream_id].request_message.stream_ended.set()
  305.         self.streams[event.pushed_stream_id].start()
  306.         return True
  307.  
  308.     def _handle_priority_updated(self, eid, event):
  309.         if not self.config.options.http2_priority:
  310.             self.log("HTTP/2 PRIORITY frame suppressed. Use --http2-priority to enable forwarding.", "debug")
  311.             return True
  312.  
  313.         if eid in self.streams and self.streams[eid].handled_priority_event is event:
  314.             # this event was already handled during stream creation
  315.             # HeadersFrame + Priority information as RequestReceived
  316.             return True
  317.  
  318.         with self.connections[self.server_conn].lock:
  319.             mapped_stream_id = event.stream_id
  320.             if mapped_stream_id in self.streams and self.streams[mapped_stream_id].server_stream_id:
  321.                 # if the stream is already up and running and was sent to the server,
  322.                 # use the mapped server stream id to update priority information
  323.                 mapped_stream_id = self.streams[mapped_stream_id].server_stream_id
  324.  
  325.             if eid in self.streams:
  326.                 self.streams[eid].priority_exclusive = event.exclusive
  327.                 self.streams[eid].priority_depends_on = event.depends_on
  328.                 self.streams[eid].priority_weight = event.weight
  329.  
  330.             self.connections[self.server_conn].prioritize(
  331.                 mapped_stream_id,
  332.                 weight=event.weight,
  333.                 depends_on=self._map_depends_on_stream_id(mapped_stream_id, event.depends_on),
  334.                 exclusive=event.exclusive
  335.             )
  336.             self.server_conn.send(self.connections[self.server_conn].data_to_send())
  337.         return True
  338.  
  339.     def _map_depends_on_stream_id(self, stream_id, depends_on):
  340.         mapped_depends_on = depends_on
  341.         if mapped_depends_on in self.streams and self.streams[mapped_depends_on].server_stream_id:
  342.             # if the depends-on-stream is already up and running and was sent to the server
  343.             # use the mapped server stream id to update priority information
  344.             mapped_depends_on = self.streams[mapped_depends_on].server_stream_id
  345.         if stream_id == mapped_depends_on:
  346.             # looks like one of the streams wasn't opened yet
  347.             # prevent self-dependent streams which result in ProtocolError
  348.             mapped_depends_on += 2
  349.         return mapped_depends_on
  350.  
  351.     def _cleanup_streams(self):
  352.         death_time = time.time() - 10
  353.  
  354.         zombie_streams = [(stream_id, stream) for stream_id, stream in list(self.streams.items()) if stream.zombie]
  355.         outdated_streams = [stream_id for stream_id, stream in zombie_streams if stream.zombie <= death_time]
  356.  
  357.         for stream_id in outdated_streams:  # pragma: no cover
  358.             self.streams.pop(stream_id, None)
  359.  
  360.     def _kill_all_streams(self):
  361.         for stream in self.streams.values():
  362.             stream.kill()
  363.  
  364.     def __call__(self):
  365.         self._initiate_server_conn()
  366.         self._complete_handshake()
  367.         self.log("BH - break point 366 of #################### http2.py &&&&&&&&&&########## ", "info")
  368.         conns = [c.connection for c in self.connections.keys()]
  369.         self.log("BH - break point 368 of #################### http2.py &&&&&&&&&&########## ", "info")
  370.         try:
  371.             while True:
  372.                 r = tcp.ssl_read_select(conns, 0.1)
  373.                 for conn in r:
  374.                     source_conn = self.client_conn if conn == self.client_conn.connection else self.server_conn
  375.                     other_conn = self.server_conn if conn == self.client_conn.connection else self.client_conn
  376.                     is_server = (source_conn == self.server_conn)
  377.                     self.log("BH - break point 376 of #################### http2.py &&&&&&&&&&########## ", "info")
  378.                     with self.connections[source_conn].lock:
  379.                         try:
  380.                             _, consumed_bytes = http2.read_frame(source_conn.rfile)
  381.                         except:
  382.                             # read frame failed: connection closed
  383.                             self._kill_all_streams()
  384.                             return
  385.                         self.log("BH - break point 384 of #################### http2.py &&&&&&&&&&########## ", "info")
  386.                         if self.connections[source_conn].state_machine.state == h2.connection.ConnectionState.CLOSED:
  387.                             self.log("HTTP/2 connection entered closed state already", "debug")
  388.                             return
  389.                         self.log("BH - break point 388 of #################### http2.py &&&&&&&&&&########## ", "info")
  390.                         incoming_events = self.connections[source_conn].receive_data(consumed_bytes)
  391.                         self.log("BH - break point 390 of #################### http2.py &&&&&&&&&&########## ", "info")
  392.                         source_conn.send(self.connections[source_conn].data_to_send())
  393.                         self.log("BH - break point 392 of #################### http2.py &&&&&&&&&&########## ", "info")
  394.  
  395.                         for event in incoming_events:
  396.                             self.log("BH - break point 395 of #################### http2.py &&&&&&&&&&########## ",
  397.                                      "info")
  398.                             if not self._handle_event(event, source_conn, other_conn, is_server):
  399.                                 self.log("BH - break point 398 of #################### http2.py &&&&&&&&&&########## ",
  400.                                          "info")
  401.                                 # connection terminated: GoAway
  402.                                 self._kill_all_streams()
  403.                                 return
  404.  
  405.                     self._cleanup_streams()
  406.         except Exception as e:  # pragma: no cover
  407.             self.log(repr(e), "info")
  408.             self.log("BH - break point 405 of #################### http2.py &&&&&&&&&&########## ", "info")
  409.             self._kill_all_streams()
  410.  
  411.  
  412. def detect_zombie_stream(func):  # pragma: no cover
  413.     @functools.wraps(func)
  414.     def wrapper(self, *args, **kwargs):
  415.         self.raise_zombie()
  416.         result = func(self, *args, **kwargs)
  417.         self.raise_zombie()
  418.         return result
  419.  
  420.     return wrapper
  421.  
  422.  
  423. class Http2SingleStreamLayer(httpbase._HttpTransmissionLayer, basethread.BaseThread):
  424.  
  425.     class Message:
  426.         def __init__(self, headers=None):
  427.             self.headers: Optional[mitmproxy.net.http.Headers] = headers  # headers are the first thing to be received on a new stream
  428.             self.data_queue: queue.Queue[bytes] = queue.Queue()  # contains raw contents of DATA frames
  429.             self.queued_data_length = 0  # used to enforce mitmproxy's config.options.body_size_limit
  430.             self.trailers: Optional[mitmproxy.net.http.Headers] = None  # trailers are received after stream_ended is set
  431.  
  432.             self.arrived = threading.Event()  # indicates the HEADERS+CONTINUTATION frames have been received
  433.             self.stream_ended = threading.Event()  # indicates the a frame with the END_STREAM flag has been received
  434.  
  435.     def __init__(self, ctx, h2_connection, stream_id: int, request_headers: mitmproxy.net.http.Headers) -> None:
  436.         super().__init__(
  437.             ctx, name=f"Http2SingleStreamLayer-{stream_id}"
  438.         )
  439.         self.h2_connection = h2_connection
  440.         self.zombie: Optional[float] = None
  441.         self.client_stream_id: int = stream_id
  442.         self.server_stream_id: Optional[int] = None
  443.         self.pushed = False
  444.  
  445.         self.timestamp_start: Optional[float] = None
  446.         self.timestamp_end: Optional[float] = None
  447.  
  448.         self.request_message = self.Message(request_headers)
  449.         self.response_message = self.Message()
  450.  
  451.         self.priority_exclusive: bool
  452.         self.priority_depends_on: Optional[int] = None
  453.         self.priority_weight: Optional[int] = None
  454.         self.handled_priority_event: Any = None
  455.  
  456.     def kill(self):
  457.         if not self.zombie:
  458.             self.zombie = time.time()
  459.             self.request_message.stream_ended.set()
  460.             self.request_message.arrived.set()
  461.             self.response_message.arrived.set()
  462.             self.response_message.stream_ended.set()
  463.  
  464.     def connect(self):  # pragma: no cover
  465.         raise exceptions.Http2ProtocolException("HTTP2 layer should already have a connection.")
  466.  
  467.     def disconnect(self):  # pragma: no cover
  468.         raise exceptions.Http2ProtocolException("Cannot dis- or reconnect in HTTP2 connections.")
  469.  
  470.     def set_server(self, address):  # pragma: no cover
  471.         raise exceptions.SetServerNotAllowedException(repr(address))
  472.  
  473.     def check_close_connection(self, flow):
  474.         # This layer only handles a single stream.
  475.         # RFC 7540 8.1: An HTTP request/response exchange fully consumes a single stream.
  476.         return True
  477.  
  478.     @property
  479.     def data_queue(self):
  480.         if self.response_message.arrived.is_set():
  481.             return self.response_message.data_queue
  482.         else:
  483.             return self.request_message.data_queue
  484.  
  485.     @property
  486.     def queued_data_length(self):
  487.         if self.response_message.arrived.is_set():
  488.             return self.response_message.queued_data_length
  489.         else:
  490.             return self.request_message.queued_data_length
  491.  
  492.     @queued_data_length.setter
  493.     def queued_data_length(self, v):
  494.         self.request_message.queued_data_length = v
  495.  
  496.     @property
  497.     def stream_ended(self):
  498.         # This indicates that all message headers, the full message body, and all trailers have been received
  499.         # https://tools.ietf.org/html/rfc7540#section-8.1
  500.         if self.response_message.arrived.is_set():
  501.             return self.response_message.stream_ended
  502.         else:
  503.             return self.request_message.stream_ended
  504.  
  505.     @property
  506.     def trailers(self):
  507.         if self.response_message.arrived.is_set():
  508.             return self.response_message.trailers
  509.         else:
  510.             return self.request_message.trailers
  511.  
  512.     @trailers.setter
  513.     def trailers(self, v):
  514.         if self.response_message.arrived.is_set():
  515.             self.response_message.trailers = v
  516.         else:
  517.             self.request_message.trailers = v
  518.  
  519.     def raise_zombie(self, pre_command=None):  # pragma: no cover
  520.         connection_closed = self.h2_connection.state_machine.state == h2.connection.ConnectionState.CLOSED
  521.         if self.zombie is not None or connection_closed:
  522.             if pre_command is not None:
  523.                 pre_command()
  524.             raise exceptions.Http2ZombieException(f"Connection or stream already dead: {self.zombie}, {connection_closed}")
  525.  
  526.     @detect_zombie_stream
  527.     def read_request_headers(self, flow):
  528.         self.request_message.arrived.wait()
  529.         self.raise_zombie()
  530.  
  531.         if self.pushed:
  532.             flow.metadata['h2-pushed-stream'] = True
  533.  
  534.         # pseudo header must be present, see https://http2.github.io/http2-spec/#rfc.section.8.1.2.3
  535.         authority = self.request_message.headers.pop(':authority', "")
  536.         method = self.request_message.headers.pop(':method')
  537.         scheme = self.request_message.headers.pop(':scheme')
  538.         path = self.request_message.headers.pop(':path')
  539.  
  540.         host, port = url.parse_authority(authority, check=True)
  541.         port = port or url.default_port(scheme) or 0
  542.  
  543.         return http.HTTPRequest(
  544.             host,
  545.             port,
  546.             method.encode(),
  547.             scheme.encode(),
  548.             authority.encode(),
  549.             path.encode(),
  550.             b"HTTP/2.0",
  551.             self.request_message.headers,
  552.             None,
  553.             None,
  554.             self.timestamp_start,
  555.             self.timestamp_end,
  556.         )
  557.  
  558.     @detect_zombie_stream
  559.     def read_request_body(self, request):
  560.         if not request.stream:
  561.             self.request_message.stream_ended.wait()
  562.  
  563.         while True:
  564.             try:
  565.                 yield self.request_message.data_queue.get(timeout=0.1)
  566.             except queue.Empty:  # pragma: no cover
  567.                 pass
  568.             if self.request_message.stream_ended.is_set():
  569.                 self.raise_zombie()
  570.                 while self.request_message.data_queue.qsize() > 0:
  571.                     yield self.request_message.data_queue.get()
  572.                 break
  573.             self.raise_zombie()
  574.  
  575.     @detect_zombie_stream
  576.     def read_request_trailers(self, request):
  577.         return self.request_message.trailers
  578.  
  579.     @detect_zombie_stream
  580.     def send_request_headers(self, request):
  581.         if self.pushed:
  582.             # nothing to do here
  583.             return
  584.  
  585.         while True:
  586.             self.raise_zombie()
  587.             self.connections[self.server_conn].lock.acquire()
  588.  
  589.             max_streams = self.connections[self.server_conn].remote_settings.max_concurrent_streams
  590.             if self.connections[self.server_conn].open_outbound_streams + 1 >= max_streams:
  591.                 # wait until we get a free slot for a new outgoing stream
  592.                 self.connections[self.server_conn].lock.release()
  593.                 time.sleep(0.1)
  594.                 continue
  595.  
  596.             # keep the lock
  597.             break
  598.  
  599.         # We must not assign a stream id if we are already a zombie.
  600.         self.raise_zombie()
  601.  
  602.         self.server_stream_id = self.connections[self.server_conn].get_next_available_stream_id()
  603.         self.server_to_client_stream_ids[self.server_stream_id] = self.client_stream_id
  604.  
  605.         headers = request.headers.copy()
  606.         if request.authority:
  607.             headers.insert(0, ":authority", request.authority)
  608.         headers.insert(0, ":path", request.path)
  609.         headers.insert(0, ":method", request.method)
  610.         headers.insert(0, ":scheme", request.scheme)
  611.  
  612.         priority_exclusive = None
  613.         priority_depends_on = None
  614.         priority_weight = None
  615.         if self.handled_priority_event:
  616.             # only send priority information if they actually came with the original HeadersFrame
  617.             # and not if they got updated before/after with a PriorityFrame
  618.             if not self.config.options.http2_priority:
  619.                 self.log("HTTP/2 PRIORITY information in HEADERS frame suppressed. Use --http2-priority to enable forwarding.", "debug")
  620.             else:
  621.                 priority_exclusive = self.priority_exclusive
  622.                 priority_depends_on = self._map_depends_on_stream_id(self.server_stream_id, self.priority_depends_on)
  623.                 priority_weight = self.priority_weight
  624.  
  625.         try:
  626.             self.connections[self.server_conn].safe_send_headers(
  627.                 self.raise_zombie,
  628.                 self.server_stream_id,
  629.                 headers,
  630.                 end_stream=(False if request.content or request.trailers or request.stream else True),
  631.                 priority_exclusive=priority_exclusive,
  632.                 priority_depends_on=priority_depends_on,
  633.                 priority_weight=priority_weight,
  634.             )
  635.         except Exception as e:  # pragma: no cover
  636.             raise e
  637.         finally:
  638.             self.raise_zombie()
  639.             self.connections[self.server_conn].lock.release()
  640.  
  641.     @detect_zombie_stream
  642.     def send_request_body(self, request, chunks):
  643.         if self.pushed:
  644.             # nothing to do here
  645.             return
  646.  
  647.         if isinstance(chunks, types.GeneratorType) or (chunks and chunks[0]):
  648.             self.connections[self.server_conn].safe_send_body(
  649.                 self.raise_zombie,
  650.                 self.server_stream_id,
  651.                 chunks,
  652.                 end_stream=(request.trailers is None),
  653.             )
  654.  
  655.     @detect_zombie_stream
  656.     def send_request_trailers(self, request):
  657.         self._send_trailers(self.server_conn, request.trailers)
  658.  
  659.     @detect_zombie_stream
  660.     def send_request(self, request):
  661.         self.send_request_headers(request)
  662.         self.send_request_body(request, [request.content])
  663.         self.send_request_trailers(request)
  664.  
  665.     @detect_zombie_stream
  666.     def read_response_headers(self):
  667.         self.response_message.arrived.wait()
  668.  
  669.         self.raise_zombie()
  670.  
  671.         status_code = int(self.response_message.headers.get(':status', 502))
  672.         headers = self.response_message.headers.copy()
  673.         headers.pop(":status", None)
  674.  
  675.         return http.HTTPResponse(
  676.             http_version=b"HTTP/2.0",
  677.             status_code=status_code,
  678.             reason=b'',
  679.             headers=headers,
  680.             content=None,
  681.             trailers=None,
  682.             timestamp_start=self.timestamp_start,
  683.             timestamp_end=self.timestamp_end,
  684.         )
  685.  
  686.     @detect_zombie_stream
  687.     def read_response_body(self, request, response):
  688.         while True:
  689.             try:
  690.                 yield self.response_message.data_queue.get(timeout=0.1)
  691.             except queue.Empty:  # pragma: no cover
  692.                 pass
  693.             if self.response_message.stream_ended.is_set():
  694.                 self.raise_zombie()
  695.                 while self.response_message.data_queue.qsize() > 0:
  696.                     yield self.response_message.data_queue.get()
  697.                 break
  698.             self.raise_zombie()
  699.  
  700.     @detect_zombie_stream
  701.     def read_response_trailers(self, request, response):
  702.         return self.response_message.trailers
  703.  
  704.     @detect_zombie_stream
  705.     def send_response_headers(self, response):
  706.         headers = response.headers.copy()
  707.         headers.insert(0, ":status", str(response.status_code))
  708.         with self.connections[self.client_conn].lock:
  709.             self.connections[self.client_conn].safe_send_headers(
  710.                 self.raise_zombie,
  711.                 self.client_stream_id,
  712.                 headers
  713.             )
  714.  
  715.     @detect_zombie_stream
  716.     def send_response_body(self, response, chunks):
  717.         self.connections[self.client_conn].safe_send_body(
  718.             self.raise_zombie,
  719.             self.client_stream_id,
  720.             chunks,
  721.             end_stream=(response.trailers is None),
  722.         )
  723.  
  724.     @detect_zombie_stream
  725.     def send_response_trailers(self, response):
  726.         self._send_trailers(self.client_conn, response.trailers)
  727.  
  728.     def _send_trailers(self, conn, trailers):
  729.         if not trailers:
  730.             return
  731.         with self.connections[conn].lock:
  732.             self.connections[conn].safe_send_headers(
  733.                 self.raise_zombie,
  734.                 self.client_stream_id,
  735.                 trailers,
  736.                 end_stream=True
  737.             )
  738.  
  739.     def __call__(self):  # pragma: no cover
  740.         raise OSError('Http2SingleStreamLayer must be run as thread')
  741.  
  742.     def run(self):
  743.         self.log("exception block - break point 742 of ################n http2.py &&&&########## ", "info")
  744.         layer = httpbase.HttpLayer(self, self.mode)
  745.         self.log("exception block - break point 744 of ################n http2.py &&&&########## ", "info")
  746.         try:
  747.             self.log("before layer() BH - break point 747 of #################### http2.py &&&&&&&########## ", "info")
  748.             layer()
  749.             self.log("after Layer() BH - break point 749 of ################ http2.py &&&&&&######## ", "info")
  750.         except exceptions.Http2ZombieException:  # pragma: no cover
  751.             # zombies can be safely terminated - no need to kill them twice
  752.             return
  753.         except exceptions.ProtocolException as e:  # pragma: no cover
  754.             self.log(repr(e), "info")
  755.             self.log("BH - break point 750 of #################### http2.py &&&&&&&&&&########## ", "info")
  756.         except exceptions.SetServerNotAllowedException as e:  # pragma: no cover
  757.             self.log(f"Changing the Host server for HTTP/2 connections not allowed: {e}", "info")
  758.             self.log("BH - break point 753 of #################### http2.py &&&&&&&&&&########## ", "info")
  759.         except exceptions.Kill:  # pragma: no cover
  760.             self.log(flow.Error.KILLED_MESSAGE, "info")
  761.             self.log("BH - break point 756 of #################### http2.py &&&&&&&&&&########## ", "info")
  762.  
  763.         self.kill()
  764.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement