_json_pipe.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. # Copyright (c) Microsoft Corporation.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import asyncio
  15. from typing import Dict, Optional, cast
  16. from pyee.asyncio import AsyncIOEventEmitter
  17. from playwright._impl._connection import Channel
  18. from playwright._impl._errors import TargetClosedError
  19. from playwright._impl._helper import Error, ParsedMessagePayload
  20. from playwright._impl._transport import Transport
  21. class JsonPipeTransport(AsyncIOEventEmitter, Transport):
  22. def __init__(
  23. self,
  24. loop: asyncio.AbstractEventLoop,
  25. pipe_channel: Channel,
  26. ) -> None:
  27. super().__init__(loop)
  28. Transport.__init__(self, loop)
  29. self._stop_requested = False
  30. self._pipe_channel = pipe_channel
  31. def request_stop(self) -> None:
  32. self._stop_requested = True
  33. self._pipe_channel.send_no_reply("close", None, {})
  34. def dispose(self) -> None:
  35. self.on_error_future.cancel()
  36. self._stopped_future.cancel()
  37. async def wait_until_stopped(self) -> None:
  38. await self._stopped_future
  39. async def connect(self) -> None:
  40. self._stopped_future: asyncio.Future = asyncio.Future()
  41. def handle_message(message: Dict) -> None:
  42. if self._stop_requested:
  43. return
  44. self.on_message(cast(ParsedMessagePayload, message))
  45. def handle_closed(reason: Optional[str]) -> None:
  46. self.emit("close", reason)
  47. if reason:
  48. self.on_error_future.set_exception(TargetClosedError(reason))
  49. self._stopped_future.set_result(None)
  50. self._pipe_channel.on(
  51. "message",
  52. lambda params: handle_message(params["message"]),
  53. )
  54. self._pipe_channel.on(
  55. "closed",
  56. lambda params: handle_closed(params.get("reason")),
  57. )
  58. async def run(self) -> None:
  59. await self._stopped_future
  60. def send(self, message: Dict) -> None:
  61. if self._stop_requested:
  62. raise Error("Playwright connection closed")
  63. self._pipe_channel.send_no_reply("send", None, {"message": message})