asyncio.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. """ZAP Authenticator integrated with the asyncio IO loop.
  2. .. versionadded:: 15.2
  3. """
  4. # Copyright (C) PyZMQ Developers
  5. # Distributed under the terms of the Modified BSD License.
  6. import asyncio
  7. import warnings
  8. from typing import Any, Optional
  9. import zmq
  10. from zmq.asyncio import Poller
  11. from .base import Authenticator
  12. class AsyncioAuthenticator(Authenticator):
  13. """ZAP authentication for use in the asyncio IO loop"""
  14. __poller: Optional[Poller]
  15. __task: Any
  16. def __init__(
  17. self,
  18. context: Optional["zmq.Context"] = None,
  19. loop: Any = None,
  20. encoding: str = 'utf-8',
  21. log: Any = None,
  22. ):
  23. super().__init__(context, encoding, log)
  24. if loop is not None:
  25. warnings.warn(
  26. f"{self.__class__.__name__}(loop) is deprecated and ignored",
  27. DeprecationWarning,
  28. stacklevel=2,
  29. )
  30. self.__poller = None
  31. self.__task = None
  32. async def __handle_zap(self) -> None:
  33. while self.__poller is not None:
  34. events = await self.__poller.poll()
  35. if self.zap_socket in dict(events):
  36. msg = self.zap_socket.recv_multipart()
  37. await self.handle_zap_message(msg)
  38. def start(self) -> None:
  39. """Start ZAP authentication"""
  40. super().start()
  41. self.__poller = Poller()
  42. self.__poller.register(self.zap_socket, zmq.POLLIN)
  43. self.__task = asyncio.ensure_future(self.__handle_zap())
  44. def stop(self) -> None:
  45. """Stop ZAP authentication"""
  46. if self.__task:
  47. self.__task.cancel()
  48. if self.__poller:
  49. self.__poller.unregister(self.zap_socket)
  50. self.__poller = None
  51. super().stop()
  52. __all__ = ["AsyncioAuthenticator"]