_core.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665
  1. import socket
  2. import struct
  3. import threading
  4. import time
  5. from typing import Optional, Union
  6. # websocket modules
  7. from ._abnf import ABNF, STATUS_NORMAL, continuous_frame, frame_buffer
  8. from ._exceptions import (
  9. WebSocketProtocolException,
  10. WebSocketConnectionClosedException,
  11. WebSocketTimeoutException,
  12. )
  13. from ._handshake import SUPPORTED_REDIRECT_STATUSES, handshake
  14. from ._http import connect, proxy_info
  15. from ._logging import debug, error, trace, isEnabledForError, isEnabledForTrace
  16. from ._socket import getdefaulttimeout, recv, send, sock_opt
  17. from ._ssl_compat import ssl
  18. from ._utils import NoLock
  19. from ._dispatcher import DispatcherBase, WrappedDispatcher
  20. """
  21. _core.py
  22. websocket - WebSocket client library for Python
  23. Copyright 2025 engn33r
  24. Licensed under the Apache License, Version 2.0 (the "License");
  25. you may not use this file except in compliance with the License.
  26. You may obtain a copy of the License at
  27. http://www.apache.org/licenses/LICENSE-2.0
  28. Unless required by applicable law or agreed to in writing, software
  29. distributed under the License is distributed on an "AS IS" BASIS,
  30. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  31. See the License for the specific language governing permissions and
  32. limitations under the License.
  33. """
  34. __all__ = ["WebSocket", "create_connection"]
  35. class WebSocket:
  36. """
  37. Low level WebSocket interface.
  38. This class is based on the WebSocket protocol `draft-hixie-thewebsocketprotocol-76 <http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76>`_
  39. We can connect to the websocket server and send/receive data.
  40. The following example is an echo client.
  41. >>> import websocket
  42. >>> ws = websocket.WebSocket()
  43. >>> ws.connect("ws://echo.websocket.events")
  44. >>> ws.recv()
  45. 'echo.websocket.events sponsored by Lob.com'
  46. >>> ws.send("Hello, Server")
  47. 19
  48. >>> ws.recv()
  49. 'Hello, Server'
  50. >>> ws.close()
  51. Parameters
  52. ----------
  53. get_mask_key: func
  54. A callable function to get new mask keys, see the
  55. WebSocket.set_mask_key's docstring for more information.
  56. sockopt: tuple
  57. Values for socket.setsockopt.
  58. sockopt must be tuple and each element is argument of sock.setsockopt.
  59. sslopt: dict
  60. Optional dict object for ssl socket options. See FAQ for details.
  61. fire_cont_frame: bool
  62. Fire recv event for each cont frame. Default is False.
  63. enable_multithread: bool
  64. If set to True, lock send method.
  65. skip_utf8_validation: bool
  66. Skip utf8 validation.
  67. """
  68. def __init__(
  69. self,
  70. get_mask_key=None,
  71. sockopt=None,
  72. sslopt=None,
  73. fire_cont_frame: bool = False,
  74. enable_multithread: bool = True,
  75. skip_utf8_validation: bool = False,
  76. dispatcher: Union[DispatcherBase, WrappedDispatcher] = None,
  77. **_,
  78. ):
  79. """
  80. Initialize WebSocket object.
  81. Parameters
  82. ----------
  83. sslopt: dict
  84. Optional dict object for ssl socket options. See FAQ for details.
  85. """
  86. self.sock_opt = sock_opt(sockopt, sslopt)
  87. self.handshake_response = None
  88. self.sock: Optional[socket.socket] = None
  89. self.connected = False
  90. self.get_mask_key = get_mask_key
  91. # These buffer over the build-up of a single frame.
  92. self.frame_buffer = frame_buffer(self._recv, skip_utf8_validation)
  93. self.cont_frame = continuous_frame(fire_cont_frame, skip_utf8_validation)
  94. self.dispatcher = dispatcher
  95. if enable_multithread:
  96. self.lock = threading.Lock()
  97. self.readlock = threading.Lock()
  98. else:
  99. self.lock = NoLock() # type: ignore[assignment]
  100. self.readlock = NoLock() # type: ignore[assignment]
  101. def __iter__(self):
  102. """
  103. Allow iteration over websocket, implying sequential `recv` executions.
  104. """
  105. while True:
  106. yield self.recv()
  107. def __next__(self):
  108. return self.recv()
  109. def next(self):
  110. return self.__next__()
  111. def fileno(self):
  112. return self.sock.fileno()
  113. def set_mask_key(self, func):
  114. """
  115. Set function to create mask key. You can customize mask key generator.
  116. Mainly, this is for testing purpose.
  117. Parameters
  118. ----------
  119. func: func
  120. callable object. the func takes 1 argument as integer.
  121. The argument means length of mask key.
  122. This func must return string(byte array),
  123. which length is argument specified.
  124. """
  125. self.get_mask_key = func
  126. def gettimeout(self) -> Optional[Union[float, int]]:
  127. """
  128. Get the websocket timeout (in seconds) as an int or float
  129. Returns
  130. ----------
  131. timeout: int or float
  132. returns timeout value (in seconds). This value could be either float/integer.
  133. """
  134. return self.sock_opt.timeout
  135. def settimeout(self, timeout: Optional[Union[float, int]]):
  136. """
  137. Set the timeout to the websocket.
  138. Parameters
  139. ----------
  140. timeout: int or float
  141. timeout time (in seconds). This value could be either float/integer.
  142. """
  143. self.sock_opt.timeout = timeout
  144. if self.sock:
  145. self.sock.settimeout(timeout)
  146. timeout = property(gettimeout, settimeout)
  147. def getsubprotocol(self):
  148. """
  149. Get subprotocol
  150. """
  151. if self.handshake_response:
  152. return self.handshake_response.subprotocol
  153. else:
  154. return None
  155. subprotocol = property(getsubprotocol)
  156. def getstatus(self):
  157. """
  158. Get handshake status
  159. """
  160. if self.handshake_response:
  161. return self.handshake_response.status
  162. else:
  163. return None
  164. status = property(getstatus)
  165. def getheaders(self):
  166. """
  167. Get handshake response header
  168. """
  169. if self.handshake_response:
  170. return self.handshake_response.headers
  171. else:
  172. return None
  173. def is_ssl(self):
  174. try:
  175. return isinstance(self.sock, ssl.SSLSocket)
  176. except (AttributeError, NameError):
  177. return False
  178. headers = property(getheaders)
  179. def connect(self, url, **options):
  180. """
  181. Connect to url. url is websocket url scheme.
  182. ie. ws://host:port/resource
  183. You can customize using 'options'.
  184. If you set "header" list object, you can set your own custom header.
  185. >>> ws = WebSocket()
  186. >>> ws.connect("ws://echo.websocket.events",
  187. ... header=["User-Agent: MyProgram",
  188. ... "x-custom: header"])
  189. Parameters
  190. ----------
  191. header: list or dict
  192. Custom http header list or dict.
  193. cookie: str
  194. Cookie value.
  195. origin: str
  196. Custom origin url.
  197. connection: str
  198. Custom connection header value.
  199. Default value "Upgrade" set in _handshake.py
  200. suppress_origin: bool
  201. Suppress outputting origin header.
  202. host: str
  203. Custom host header string.
  204. timeout: int or float
  205. Socket timeout time. This value is an integer or float.
  206. If you set None for this value, it means "use default_timeout value"
  207. http_proxy_host: str
  208. HTTP proxy host name.
  209. http_proxy_port: str or int
  210. HTTP proxy port. Default is 80.
  211. http_no_proxy: list
  212. Whitelisted host names that don't use the proxy.
  213. http_proxy_auth: tuple
  214. HTTP proxy auth information. Tuple of username and password. Default is None.
  215. http_proxy_timeout: int or float
  216. HTTP proxy timeout, default is 60 sec as per python-socks.
  217. redirect_limit: int
  218. Number of redirects to follow.
  219. subprotocols: list
  220. List of available subprotocols. Default is None.
  221. socket: socket
  222. Pre-initialized stream socket.
  223. """
  224. self.sock_opt.timeout = options.get("timeout", self.sock_opt.timeout)
  225. self.sock, addrs = connect(
  226. url, self.sock_opt, proxy_info(**options), options.pop("socket", None)
  227. )
  228. try:
  229. self.handshake_response = handshake(self.sock, url, *addrs, **options)
  230. for _ in range(options.pop("redirect_limit", 3)):
  231. if self.handshake_response.status in SUPPORTED_REDIRECT_STATUSES:
  232. url = self.handshake_response.headers["location"]
  233. self.sock.close()
  234. self.sock, addrs = connect(
  235. url,
  236. self.sock_opt,
  237. proxy_info(**options),
  238. options.pop("socket", None),
  239. )
  240. self.handshake_response = handshake(
  241. self.sock, url, *addrs, **options
  242. )
  243. self.connected = True
  244. except:
  245. if self.sock:
  246. self.sock.close()
  247. self.sock = None
  248. raise
  249. def send(self, payload: Union[bytes, str], opcode: int = ABNF.OPCODE_TEXT) -> int:
  250. """
  251. Send the data as string.
  252. Parameters
  253. ----------
  254. payload: str
  255. Payload must be utf-8 string or unicode,
  256. If the opcode is OPCODE_TEXT.
  257. Otherwise, it must be string(byte array).
  258. opcode: int
  259. Operation code (opcode) to send.
  260. """
  261. frame = ABNF.create_frame(payload, opcode)
  262. return self.send_frame(frame)
  263. def send_text(self, text_data: str) -> int:
  264. """
  265. Sends UTF-8 encoded text.
  266. """
  267. return self.send(text_data, ABNF.OPCODE_TEXT)
  268. def send_bytes(self, data: Union[bytes, bytearray]) -> int:
  269. """
  270. Sends a sequence of bytes.
  271. """
  272. return self.send(data, ABNF.OPCODE_BINARY)
  273. def send_frame(self, frame) -> int:
  274. """
  275. Send the data frame.
  276. >>> ws = create_connection("ws://echo.websocket.events")
  277. >>> frame = ABNF.create_frame("Hello", ABNF.OPCODE_TEXT)
  278. >>> ws.send_frame(frame)
  279. >>> cont_frame = ABNF.create_frame("My name is ", ABNF.OPCODE_CONT, 0)
  280. >>> ws.send_frame(frame)
  281. >>> cont_frame = ABNF.create_frame("Foo Bar", ABNF.OPCODE_CONT, 1)
  282. >>> ws.send_frame(frame)
  283. Parameters
  284. ----------
  285. frame: ABNF frame
  286. frame data created by ABNF.create_frame
  287. """
  288. if self.get_mask_key:
  289. frame.get_mask_key = self.get_mask_key
  290. data = frame.format()
  291. length = len(data)
  292. if isEnabledForTrace():
  293. trace(f"++Sent raw: {repr(data)}")
  294. trace(f"++Sent decoded: {frame.__str__()}")
  295. with self.lock:
  296. while data:
  297. bytes_sent = self._send(data)
  298. data = data[bytes_sent:]
  299. return length
  300. def send_binary(self, payload: bytes) -> int:
  301. """
  302. Send a binary message (OPCODE_BINARY).
  303. Parameters
  304. ----------
  305. payload: bytes
  306. payload of message to send.
  307. """
  308. return self.send(payload, ABNF.OPCODE_BINARY)
  309. def ping(self, payload: Union[str, bytes] = ""):
  310. """
  311. Send ping data.
  312. Parameters
  313. ----------
  314. payload: str
  315. data payload to send server.
  316. """
  317. if isinstance(payload, str):
  318. payload = payload.encode("utf-8")
  319. self.send(payload, ABNF.OPCODE_PING)
  320. def pong(self, payload: Union[str, bytes] = ""):
  321. """
  322. Send pong data.
  323. Parameters
  324. ----------
  325. payload: str
  326. data payload to send server.
  327. """
  328. if isinstance(payload, str):
  329. payload = payload.encode("utf-8")
  330. self.send(payload, ABNF.OPCODE_PONG)
  331. def recv(self) -> Union[str, bytes]:
  332. """
  333. Receive string data(byte array) from the server.
  334. Returns
  335. ----------
  336. data: string (byte array) value.
  337. """
  338. with self.readlock:
  339. opcode, data = self.recv_data()
  340. if opcode == ABNF.OPCODE_TEXT:
  341. data_received: Union[bytes, str] = data
  342. if isinstance(data_received, bytes):
  343. return data_received.decode("utf-8")
  344. elif isinstance(data_received, str):
  345. return data_received
  346. elif opcode == ABNF.OPCODE_BINARY:
  347. data_binary: bytes = data
  348. return data_binary
  349. else:
  350. return ""
  351. def recv_data(self, control_frame: bool = False) -> tuple:
  352. """
  353. Receive data with operation code.
  354. Parameters
  355. ----------
  356. control_frame: bool
  357. a boolean flag indicating whether to return control frame
  358. data, defaults to False
  359. Returns
  360. -------
  361. opcode, frame.data: tuple
  362. tuple of operation code and string(byte array) value.
  363. """
  364. opcode, frame = self.recv_data_frame(control_frame)
  365. return opcode, frame.data
  366. def recv_data_frame(self, control_frame: bool = False) -> tuple:
  367. """
  368. Receive data with operation code.
  369. If a valid ping message is received, a pong response is sent.
  370. Parameters
  371. ----------
  372. control_frame: bool
  373. a boolean flag indicating whether to return control frame
  374. data, defaults to False
  375. Returns
  376. -------
  377. frame.opcode, frame: tuple
  378. tuple of operation code and string(byte array) value.
  379. """
  380. while True:
  381. frame = self.recv_frame()
  382. if isEnabledForTrace():
  383. trace(f"++Rcv raw: {repr(frame.format())}")
  384. trace(f"++Rcv decoded: {frame.__str__()}")
  385. if not frame:
  386. # handle error:
  387. # 'NoneType' object has no attribute 'opcode'
  388. raise WebSocketProtocolException(f"Not a valid frame {frame}")
  389. elif frame.opcode in (
  390. ABNF.OPCODE_TEXT,
  391. ABNF.OPCODE_BINARY,
  392. ABNF.OPCODE_CONT,
  393. ):
  394. self.cont_frame.validate(frame)
  395. self.cont_frame.add(frame)
  396. if self.cont_frame.is_fire(frame):
  397. return self.cont_frame.extract(frame)
  398. elif frame.opcode == ABNF.OPCODE_CLOSE:
  399. self.send_close()
  400. return frame.opcode, frame
  401. elif frame.opcode == ABNF.OPCODE_PING:
  402. if len(frame.data) < 126:
  403. self.pong(frame.data)
  404. else:
  405. raise WebSocketProtocolException("Ping message is too long")
  406. if control_frame:
  407. return frame.opcode, frame
  408. elif frame.opcode == ABNF.OPCODE_PONG:
  409. if control_frame:
  410. return frame.opcode, frame
  411. def recv_frame(self):
  412. """
  413. Receive data as frame from server.
  414. Returns
  415. -------
  416. self.frame_buffer.recv_frame(): ABNF frame object
  417. """
  418. return self.frame_buffer.recv_frame()
  419. def send_close(self, status: int = STATUS_NORMAL, reason: bytes = b""):
  420. """
  421. Send close data to the server.
  422. Parameters
  423. ----------
  424. status: int
  425. Status code to send. See STATUS_XXX.
  426. reason: str or bytes
  427. The reason to close. This must be string or UTF-8 bytes.
  428. """
  429. if status < 0 or status >= ABNF.LENGTH_16:
  430. raise ValueError("code is invalid range")
  431. self.connected = False
  432. self.send(struct.pack("!H", status) + reason, ABNF.OPCODE_CLOSE)
  433. def close(self, status: int = STATUS_NORMAL, reason: bytes = b"", timeout: int = 3):
  434. """
  435. Close Websocket object
  436. Parameters
  437. ----------
  438. status: int
  439. Status code to send. See VALID_CLOSE_STATUS in ABNF.
  440. reason: bytes
  441. The reason to close in UTF-8.
  442. timeout: int or float
  443. Timeout until receive a close frame.
  444. If None, it will wait forever until receive a close frame.
  445. """
  446. if not self.connected:
  447. return
  448. if status < 0 or status >= ABNF.LENGTH_16:
  449. raise ValueError("code is invalid range")
  450. try:
  451. self.connected = False
  452. self.send(struct.pack("!H", status) + reason, ABNF.OPCODE_CLOSE)
  453. if self.sock is None:
  454. return
  455. sock_timeout = self.sock.gettimeout()
  456. self.sock.settimeout(timeout)
  457. start_time = time.time()
  458. while timeout is None or time.time() - start_time < timeout:
  459. try:
  460. frame = self.recv_frame()
  461. if frame.opcode != ABNF.OPCODE_CLOSE:
  462. continue
  463. if isEnabledForError():
  464. recv_status = struct.unpack("!H", frame.data[0:2])[0]
  465. if recv_status >= 3000 and recv_status <= 4999:
  466. debug(f"close status: {repr(recv_status)}")
  467. elif recv_status != STATUS_NORMAL:
  468. error(f"close status: {repr(recv_status)}")
  469. break
  470. except (
  471. WebSocketConnectionClosedException,
  472. WebSocketTimeoutException,
  473. struct.error,
  474. ):
  475. break
  476. if self.sock is not None:
  477. self.sock.settimeout(sock_timeout)
  478. self.sock.shutdown(socket.SHUT_RDWR)
  479. except:
  480. pass
  481. self.shutdown()
  482. def abort(self):
  483. """
  484. Low-level asynchronous abort, wakes up other threads that are waiting in recv_*
  485. """
  486. if self.connected:
  487. self.sock.shutdown(socket.SHUT_RDWR)
  488. def shutdown(self):
  489. """
  490. close socket, immediately.
  491. """
  492. if self.sock:
  493. self.sock.close()
  494. self.sock = None
  495. self.connected = False
  496. def _send(self, data: Union[str, bytes]):
  497. if self.sock is None:
  498. raise WebSocketConnectionClosedException("socket is already closed.")
  499. if self.dispatcher:
  500. return self.dispatcher.send(self.sock, data)
  501. return send(self.sock, data)
  502. def _recv(self, bufsize):
  503. try:
  504. return recv(self.sock, bufsize)
  505. except WebSocketConnectionClosedException:
  506. if self.sock:
  507. self.sock.close()
  508. self.sock = None
  509. self.connected = False
  510. raise
  511. def create_connection(url: str, timeout=None, class_=WebSocket, **options):
  512. """
  513. Connect to url and return websocket object.
  514. Connect to url and return the WebSocket object.
  515. Passing optional timeout parameter will set the timeout on the socket.
  516. If no timeout is supplied,
  517. the global default timeout setting returned by getdefaulttimeout() is used.
  518. You can customize using 'options'.
  519. If you set "header" list object, you can set your own custom header.
  520. >>> conn = create_connection("ws://echo.websocket.events",
  521. ... header=["User-Agent: MyProgram",
  522. ... "x-custom: header"])
  523. Parameters
  524. ----------
  525. class_: class
  526. class to instantiate when creating the connection. It has to implement
  527. settimeout and connect. It's __init__ should be compatible with
  528. WebSocket.__init__, i.e. accept all of it's kwargs.
  529. header: list or dict
  530. custom http header list or dict.
  531. cookie: str
  532. Cookie value.
  533. origin: str
  534. custom origin url.
  535. suppress_origin: bool
  536. suppress outputting origin header.
  537. host: str
  538. custom host header string.
  539. timeout: int or float
  540. socket timeout time. This value could be either float/integer.
  541. If set to None, it uses the default_timeout value.
  542. http_proxy_host: str
  543. HTTP proxy host name.
  544. http_proxy_port: str or int
  545. HTTP proxy port. If not set, set to 80.
  546. http_no_proxy: list
  547. Whitelisted host names that don't use the proxy.
  548. http_proxy_auth: tuple
  549. HTTP proxy auth information. tuple of username and password. Default is None.
  550. http_proxy_timeout: int or float
  551. HTTP proxy timeout, default is 60 sec as per python-socks.
  552. enable_multithread: bool
  553. Enable lock for multithread.
  554. redirect_limit: int
  555. Number of redirects to follow.
  556. sockopt: tuple
  557. Values for socket.setsockopt.
  558. sockopt must be a tuple and each element is an argument of sock.setsockopt.
  559. sslopt: dict
  560. Optional dict object for ssl socket options. See FAQ for details.
  561. subprotocols: list
  562. List of available subprotocols. Default is None.
  563. skip_utf8_validation: bool
  564. Skip utf8 validation.
  565. socket: socket
  566. Pre-initialized stream socket.
  567. """
  568. sockopt = options.pop("sockopt", [])
  569. sslopt = options.pop("sslopt", {})
  570. fire_cont_frame = options.pop("fire_cont_frame", False)
  571. enable_multithread = options.pop("enable_multithread", True)
  572. skip_utf8_validation = options.pop("skip_utf8_validation", False)
  573. websock = class_(
  574. sockopt=sockopt,
  575. sslopt=sslopt,
  576. fire_cont_frame=fire_cont_frame,
  577. enable_multithread=enable_multithread,
  578. skip_utf8_validation=skip_utf8_validation,
  579. **options,
  580. )
  581. websock.settimeout(timeout if timeout is not None else getdefaulttimeout())
  582. websock.connect(url, **options)
  583. return websock