EventSourceResponse.cs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. #if !BESTHTTP_DISABLE_SERVERSENT_EVENTS && (!UNITY_WEBGL || UNITY_EDITOR)
  2. using System;
  3. using System.IO;
  4. using System.Threading;
  5. using System.Text;
  6. using System.Collections.Generic;
  7. namespace BestHTTP.ServerSentEvents
  8. {
  9. /// <summary>
  10. /// A low-level class to receive and parse an EventSource(http://www.w3.org/TR/eventsource/) stream.
  11. /// Higher level protocol representation is implemented in the EventSource class.
  12. /// </summary>
  13. public sealed class EventSourceResponse : HTTPResponse, IProtocol
  14. {
  15. public bool IsClosed { get; private set; }
  16. #region Public Events
  17. public Action<EventSourceResponse, BestHTTP.ServerSentEvents.Message> OnMessage;
  18. public Action<EventSourceResponse> OnClosed;
  19. #endregion
  20. #region Privates
  21. /// <summary>
  22. /// Thread sync object
  23. /// </summary>
  24. private object FrameLock = new object();
  25. /// <summary>
  26. /// Buffer for the read data.
  27. /// </summary>
  28. private byte[] LineBuffer;
  29. /// <summary>
  30. /// Buffer position.
  31. /// </summary>
  32. private int LineBufferPos = 0;
  33. /// <summary>
  34. /// The currently receiving and parsing message
  35. /// </summary>
  36. private BestHTTP.ServerSentEvents.Message CurrentMessage;
  37. /// <summary>
  38. /// Completed messages that waiting to be dispatched
  39. /// </summary>
  40. private List<BestHTTP.ServerSentEvents.Message> CompletedMessages = new List<BestHTTP.ServerSentEvents.Message>();
  41. #endregion
  42. public EventSourceResponse(HTTPRequest request, Stream stream, bool isStreamed, bool isFromCache)
  43. :base(request, stream, isStreamed, isFromCache)
  44. {
  45. base.IsClosedManually = true;
  46. }
  47. public override bool Receive(int forceReadRawContentLength = -1, bool readPayloadData = true)
  48. {
  49. bool received = base.Receive(forceReadRawContentLength, false);
  50. string contentType = this.GetFirstHeaderValue("content-type");
  51. base.IsUpgraded = received &&
  52. this.StatusCode == 200 &&
  53. !string.IsNullOrEmpty(contentType) &&
  54. contentType.ToLower().StartsWith("text/event-stream");
  55. // If we didn't upgraded to the protocol we have to read all the sent payload because
  56. // next requests may read these datas as HTTP headers and will fail
  57. if (!IsUpgraded)
  58. ReadPayload(forceReadRawContentLength);
  59. return received;
  60. }
  61. internal void StartReceive()
  62. {
  63. if (IsUpgraded)
  64. {
  65. #if NETFX_CORE
  66. #pragma warning disable 4014
  67. Windows.System.Threading.ThreadPool.RunAsync(ReceiveThreadFunc);
  68. #pragma warning restore 4014
  69. #else
  70. ThreadPool.QueueUserWorkItem(ReceiveThreadFunc);
  71. //new Thread(ReceiveThreadFunc)
  72. // .Start();
  73. #endif
  74. }
  75. }
  76. #region Private Threading Functions
  77. private void ReceiveThreadFunc(object param)
  78. {
  79. try
  80. {
  81. if (HasHeaderWithValue("transfer-encoding", "chunked"))
  82. ReadChunked(Stream);
  83. else
  84. ReadRaw(Stream, -1);
  85. }
  86. #if !NETFX_CORE
  87. catch (ThreadAbortException)
  88. {
  89. this.baseRequest.State = HTTPRequestStates.Aborted;
  90. }
  91. #endif
  92. catch (Exception e)
  93. {
  94. if (HTTPUpdateDelegator.IsCreated)
  95. {
  96. this.baseRequest.Exception = e;
  97. this.baseRequest.State = HTTPRequestStates.Error;
  98. }
  99. else
  100. this.baseRequest.State = HTTPRequestStates.Aborted;
  101. }
  102. finally
  103. {
  104. IsClosed = true;
  105. }
  106. }
  107. #endregion
  108. #region Read Implementations
  109. // http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.6.1
  110. private new void ReadChunked(Stream stream)
  111. {
  112. int chunkLength = ReadChunkLength(stream);
  113. byte[] buffer = Extensions.VariableSizedBufferPool.Get(chunkLength, true);
  114. while (chunkLength != 0)
  115. {
  116. // To avoid more GC garbage we use only one buffer, and resize only if the next chunk doesn't fit.
  117. if (buffer.Length < chunkLength)
  118. Extensions.VariableSizedBufferPool.Resize(ref buffer, chunkLength, true);
  119. int readBytes = 0;
  120. // Fill up the buffer
  121. do
  122. {
  123. int bytes = stream.Read(buffer, readBytes, chunkLength - readBytes);
  124. if (bytes == 0)
  125. throw new Exception("The remote server closed the connection unexpectedly!");
  126. readBytes += bytes;
  127. } while (readBytes < chunkLength);
  128. FeedData(buffer, readBytes);
  129. // Every chunk data has a trailing CRLF
  130. ReadTo(stream, LF);
  131. // read the next chunk's length
  132. chunkLength = ReadChunkLength(stream);
  133. }
  134. Extensions.VariableSizedBufferPool.Release(buffer);
  135. // Read the trailing headers or the CRLF
  136. ReadHeaders(stream);
  137. }
  138. private new void ReadRaw(Stream stream, long contentLength)
  139. {
  140. byte[] buffer = Extensions.VariableSizedBufferPool.Get(1024, true);
  141. int bytes;
  142. do
  143. {
  144. bytes = stream.Read(buffer, 0, buffer.Length);
  145. FeedData(buffer, bytes);
  146. } while(bytes > 0);
  147. Extensions.VariableSizedBufferPool.Release(buffer);
  148. }
  149. #endregion
  150. #region Data Parsing
  151. public void FeedData(byte[] buffer, int count)
  152. {
  153. if (count == -1)
  154. count = buffer.Length;
  155. if (count == 0)
  156. return;
  157. if (LineBuffer == null)
  158. LineBuffer = Extensions.VariableSizedBufferPool.Get(1024, true);
  159. int newlineIdx;
  160. int pos = 0;
  161. do {
  162. newlineIdx = -1;
  163. int skipCount = 1; // to skip CR and/or LF
  164. for (int i = pos; i < count && newlineIdx == -1; ++i)
  165. {
  166. // Lines must be separated by either a U+000D CARRIAGE RETURN U+000A LINE FEED (CRLF) character pair, a single U+000A LINE FEED (LF) character, or a single U+000D CARRIAGE RETURN (CR) character.
  167. if (buffer[i] == HTTPResponse.CR)
  168. {
  169. if (i + 1 < count && buffer[i + 1] == HTTPResponse.LF)
  170. skipCount = 2;
  171. newlineIdx = i;
  172. }
  173. else if (buffer[i] == HTTPResponse.LF)
  174. newlineIdx = i;
  175. }
  176. int copyIndex = newlineIdx == -1 ? count : newlineIdx;
  177. if (LineBuffer.Length < LineBufferPos + (copyIndex - pos))
  178. {
  179. int newSize = LineBufferPos + (copyIndex - pos);
  180. Extensions.VariableSizedBufferPool.Resize(ref LineBuffer, newSize, true);
  181. }
  182. Array.Copy(buffer, pos, LineBuffer, LineBufferPos, copyIndex - pos);
  183. LineBufferPos += copyIndex - pos;
  184. if (newlineIdx == -1)
  185. return;
  186. ParseLine(LineBuffer, LineBufferPos);
  187. LineBufferPos = 0;
  188. //pos += newlineIdx + skipCount;
  189. pos = newlineIdx + skipCount;
  190. }while(newlineIdx != -1 && pos < count);
  191. }
  192. void ParseLine(byte[] buffer, int count)
  193. {
  194. // If the line is empty (a blank line) => Dispatch the event
  195. if (count == 0)
  196. {
  197. if (CurrentMessage != null)
  198. {
  199. lock (FrameLock)
  200. CompletedMessages.Add(CurrentMessage);
  201. CurrentMessage = null;
  202. }
  203. return;
  204. }
  205. // If the line starts with a U+003A COLON character (:) => Ignore the line.
  206. if (buffer[0] == 0x3A)
  207. return;
  208. //If the line contains a U+003A COLON character (:)
  209. int colonIdx = -1;
  210. for (int i = 0; i < count && colonIdx == -1; ++i)
  211. if (buffer[i] == 0x3A)
  212. colonIdx = i;
  213. string field;
  214. string value;
  215. if (colonIdx != -1)
  216. {
  217. // Collect the characters on the line before the first U+003A COLON character (:), and let field be that string.
  218. field = Encoding.UTF8.GetString(buffer, 0, colonIdx);
  219. //Collect the characters on the line after the first U+003A COLON character (:), and let value be that string. If value starts with a U+0020 SPACE character, remove it from value.
  220. if (colonIdx + 1 < count && buffer[colonIdx + 1] == 0x20)
  221. colonIdx++;
  222. colonIdx++;
  223. // discarded because it is not followed by a blank line
  224. if (colonIdx >= count)
  225. return;
  226. value = Encoding.UTF8.GetString(buffer, colonIdx, count - colonIdx);
  227. }
  228. else
  229. {
  230. // Otherwise, the string is not empty but does not contain a U+003A COLON character (:) =>
  231. // Process the field using the whole line as the field name, and the empty string as the field value.
  232. field = Encoding.UTF8.GetString(buffer, 0, count);
  233. value = string.Empty;
  234. }
  235. if (CurrentMessage == null)
  236. CurrentMessage = new BestHTTP.ServerSentEvents.Message();
  237. switch(field)
  238. {
  239. // If the field name is "id" => Set the last event ID buffer to the field value.
  240. case "id":
  241. CurrentMessage.Id = value;
  242. break;
  243. // If the field name is "event" => Set the event type buffer to field value.
  244. case "event":
  245. CurrentMessage.Event = value;
  246. break;
  247. // If the field name is "data" => Append the field value to the data buffer, then append a single U+000A LINE FEED (LF) character to the data buffer.
  248. case "data":
  249. // Append a new line if we already have some data. This way we can skip step 3.) in the EventSource's OnMessageReceived.
  250. // We do only null check, because empty string can be valid payload
  251. if (CurrentMessage.Data != null)
  252. CurrentMessage.Data += Environment.NewLine;
  253. CurrentMessage.Data += value;
  254. break;
  255. // If the field name is "retry" => If the field value consists of only ASCII digits, then interpret the field value as an integer in base ten,
  256. // and set the event stream's reconnection time to that integer. Otherwise, ignore the field.
  257. case "retry":
  258. int result;
  259. if (int.TryParse(value, out result))
  260. CurrentMessage.Retry = TimeSpan.FromMilliseconds(result);
  261. break;
  262. // Otherwise: The field is ignored.
  263. default:
  264. break;
  265. }
  266. }
  267. #endregion
  268. void IProtocol.HandleEvents()
  269. {
  270. lock(FrameLock)
  271. {
  272. // Send out messages.
  273. if (CompletedMessages.Count > 0)
  274. {
  275. if (OnMessage != null)
  276. for (int i = 0; i < CompletedMessages.Count; ++i)
  277. {
  278. try
  279. {
  280. OnMessage(this, CompletedMessages[i]);
  281. }
  282. catch(Exception ex)
  283. {
  284. HTTPManager.Logger.Exception("EventSourceMessage", "HandleEvents - OnMessage", ex);
  285. }
  286. }
  287. CompletedMessages.Clear();
  288. }
  289. }
  290. // We are closed
  291. if (IsClosed)
  292. {
  293. CompletedMessages.Clear();
  294. if (OnClosed != null)
  295. {
  296. try
  297. {
  298. OnClosed(this);
  299. }
  300. catch (Exception ex)
  301. {
  302. HTTPManager.Logger.Exception("EventSourceMessage", "HandleEvents - OnClosed", ex);
  303. }
  304. finally
  305. {
  306. OnClosed = null;
  307. }
  308. }
  309. }
  310. }
  311. }
  312. }
  313. #endif