Advertisement
Guest User

Untitled

a guest
Apr 1st, 2013
130
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 11.71 KB | None | 0 0
  1. CompilerIf #PB_Compiler_IsMainFile
  2. EnableExplicit
  3. CompilerEndIf
  4.  
  5. CompilerIf Not #PB_Compiler_Thread
  6. CompilerError "Must enable thread-safe!"
  7. CompilerEndIf
  8.  
  9. Interface BufferedStream
  10. close.i() ;Close the stream.
  11. isClosed.i() ;Tell whether the stream was closed or not.
  12. writeAscii(ascii.i) ;Write a single Ascii character.
  13. WriteCharacter(character.i) ;Write a single Character.
  14. writeBlock(*memory, size.i) ;Write bytes into the stream.
  15. readAscii.a() ;Read a single Ascii character.
  16. readBlock.i(*memory, size.i) ;Read bytes into a buffer.
  17. readLine.s(bufferSize.i = 1024) ;Read a line of text.
  18. bytesAvailable.i() ;Tell how many bytes are available in the buffer.
  19. skip.i(bytes.i) ;Skip bytes.
  20. EndInterface
  21.  
  22. Structure BufferedStreamS
  23. *vTable
  24. hSemaphoreNotEmpty.i
  25. hSemaphoreNotFull.i
  26. readerWaits.i
  27. writerWaits.i
  28. hMutex.i
  29. *buffer
  30. bufferSize.i
  31. writePos.i
  32. readPos.i
  33. bytesAvailable.i
  34. closed.i
  35. skip.i
  36. EndStructure
  37.  
  38. Macro new(Classname)
  39. Classname#_new()
  40. EndMacro
  41.  
  42. Procedure BufferedStream_new(bufferSize.i = 1024)
  43. Protected *this.BufferedStreamS = AllocateMemory(SizeOf(BufferedStreamS))
  44.  
  45. If (bufferSize < 1)
  46. bufferSize = 1
  47. EndIf
  48.  
  49. If (Not *this)
  50. ProcedureReturn #False
  51. EndIf
  52.  
  53. With *this
  54. \vTable = ?BufferedStream_vTable
  55. \bufferSize = bufferSize
  56. \buffer = AllocateMemory(bufferSize, #PB_Memory_NoClear)
  57. If (Not \buffer)
  58. FreeMemory(*this)
  59. ProcedureReturn #False
  60. EndIf
  61.  
  62. \hSemaphoreNotEmpty = CreateSemaphore(0)
  63. \hSemaphoreNotFull = CreateSemaphore(1)
  64. \readerWaits = #True
  65. \writerWaits = #False
  66. \hMutex = CreateMutex()
  67. \readPos = 0
  68. \writePos = 0
  69. \closed = #False
  70. \skip = 0
  71. \bytesAvailable = 0
  72. EndWith
  73.  
  74. ProcedureReturn *this
  75. EndProcedure
  76.  
  77. Procedure BufferedStream_close(*this.BufferedStreamS)
  78. With *this
  79. LockMutex(\hMutex)
  80. \closed = #True
  81. If (\readerWaits)
  82. SignalSemaphore(\hSemaphoreNotEmpty)
  83. EndIf
  84. UnlockMutex(\hMutex)
  85. EndWith
  86. EndProcedure
  87.  
  88. Procedure.i BufferedStream_isClosed(*this.BufferedStreamS)
  89. Protected isClosed.i
  90. With *this
  91. LockMutex(\hMutex)
  92. isClosed = \closed
  93. UnlockMutex(\hMutex)
  94. EndWith
  95.  
  96. ProcedureReturn isClosed
  97. EndProcedure
  98.  
  99. ; 0123456789
  100. ;read: ^
  101. ;write: ^
  102.  
  103. Procedure.i BufferedStream_writeAscii(*this.BufferedStreamS, ascii.i)
  104. With *this
  105. LockMutex(\hMutex)
  106.  
  107. If (\closed)
  108. UnlockMutex(\hMutex)
  109. ProcedureReturn #False
  110. EndIf
  111.  
  112. If (\skip > 0)
  113. \skip - 1
  114. UnlockMutex(\hMutex)
  115. ProcedureReturn #True
  116. EndIf
  117.  
  118. While (\bytesAvailable = \bufferSize)
  119. \writerWaits = #True
  120. UnlockMutex(\hMutex)
  121. WaitSemaphore(\hSemaphoreNotFull)
  122. LockMutex(\hMutex)
  123. Wend
  124. \writerWaits = #False
  125.  
  126. PokeA(\buffer + \writePos, ascii)
  127. \writePos = (\writePos + 1) % \bufferSize
  128. \bytesAvailable + 1
  129.  
  130. ;Debug "writeAscii: ascii=" + ascii + " writePos=" + \writePos
  131.  
  132. If (\readerWaits)
  133. SignalSemaphore(\hSemaphoreNotEmpty)
  134. EndIf
  135.  
  136. UnlockMutex(\hMutex)
  137. EndWith
  138.  
  139. ProcedureReturn #True
  140. EndProcedure
  141.  
  142. Procedure.i BufferedStream_writeBlock(*this.BufferedStreamS, *memory.Byte, size.i)
  143. Protected bytesFree.i, *write.Byte
  144.  
  145. With *this
  146. LockMutex(\hMutex)
  147.  
  148. If (\closed)
  149. UnlockMutex(\hMutex)
  150. ProcedureReturn #False
  151. EndIf
  152.  
  153. If (\skip > 0)
  154. If (size > \skip)
  155. *memory + (size - \skip)
  156. size - \skip
  157. Else
  158. \skip - size
  159. UnlockMutex(\hMutex)
  160. ProcedureReturn #True
  161. EndIf
  162. EndIf
  163.  
  164. *write = \buffer + \writePos
  165.  
  166. While size > 0
  167. While (\bytesAvailable = \bufferSize)
  168. \writerWaits = #True
  169. UnlockMutex(\hMutex)
  170. WaitSemaphore(\hSemaphoreNotFull)
  171. LockMutex(\hMutex)
  172. Wend
  173. \writerWaits = #False
  174.  
  175. bytesFree = \bufferSize - \bytesAvailable
  176. While bytesFree > 0 And size > 0
  177. *write\b = *memory\b
  178. *write + 1
  179. *memory + 1
  180. bytesFree - 1
  181. size - 1
  182. \writePos + 1
  183. If (\writePos = \bufferSize)
  184. \writePos = 0
  185. *write = \buffer
  186. EndIf
  187. Wend
  188.  
  189. \bytesAvailable = \bufferSize - bytesFree
  190.  
  191. If (\readerWaits)
  192. SignalSemaphore(\hSemaphoreNotEmpty)
  193. EndIf
  194. Wend
  195.  
  196. UnlockMutex(\hMutex)
  197. EndWith
  198.  
  199. ProcedureReturn #True
  200. EndProcedure
  201.  
  202. CompilerIf #PB_Compiler_Unicode
  203. Procedure.i BufferedStream_writeCharacter(*this.BufferedStream, character.i)
  204. ProcedureReturn *this\writeBlock(@character, SizeOf(Character))
  205. EndProcedure
  206. CompilerEndIf
  207.  
  208. Procedure.a BufferedStream_readAscii(*this.BufferedStreamS)
  209. Protected ascii.a
  210. With *this
  211. LockMutex(\hMutex)
  212. While (\bytesAvailable = 0)
  213. If (\closed)
  214. ;TODO
  215. UnlockMutex(\hMutex)
  216. ProcedureReturn #False
  217. EndIf
  218. \readerWaits = #True
  219. UnlockMutex(\hMutex)
  220. WaitSemaphore(\hSemaphoreNotEmpty)
  221. LockMutex(\hMutex)
  222. Wend
  223. \readerWaits = #False
  224.  
  225. ascii = PeekA(\buffer + \readPos)
  226. \readPos = (\readPos + 1) % \bufferSize
  227. \bytesAvailable - 1
  228.  
  229. If (\writerWaits)
  230. SignalSemaphore(\hSemaphoreNotFull)
  231. EndIf
  232. UnlockMutex(\hMutex)
  233. EndWith
  234.  
  235. ProcedureReturn ascii
  236. EndProcedure
  237.  
  238. Procedure.i BufferedStream_readBlock(*this.BufferedStreamS, *memory.Byte, size.i)
  239. Protected *read.Byte, initSize.i = size
  240.  
  241. With *this
  242. LockMutex(\hMutex)
  243.  
  244. *read = \buffer + \readPos
  245. While size > 0; And (Not \closed)
  246. While (\bytesAvailable = 0)
  247. If (\closed)
  248. UnlockMutex(\hMutex)
  249. ProcedureReturn 0
  250. EndIf
  251. \readerWaits = #True
  252. UnlockMutex(\hMutex)
  253. WaitSemaphore(\hSemaphoreNotEmpty)
  254. LockMutex(\hMutex)
  255. Wend
  256. \readerWaits = #False
  257.  
  258. While \bytesAvailable > 0 And size > 0; And (Not \closed)
  259. *memory\b = *read\b
  260. *memory + 1
  261. *read + 1
  262. \bytesAvailable - 1
  263. size - 1
  264. \readPos + 1
  265. If (\readPos = \bufferSize)
  266. \readPos = 0
  267. *read = \buffer
  268. EndIf
  269. Wend
  270.  
  271. If (\writerWaits)
  272. SignalSemaphore(\hSemaphoreNotFull)
  273. EndIf
  274. Wend
  275.  
  276. UnlockMutex(\hMutex)
  277.  
  278. EndWith
  279.  
  280. ProcedureReturn initSize - size
  281. EndProcedure
  282.  
  283. Procedure.s BufferedStream_readLine(*this.BufferedStream, bufferSize.i = 1024)
  284. Protected result.s, char.c, *r.Character, rLength.i, last13.c = #False
  285.  
  286. If (bufferSize < 1)
  287. bufferSize = 1
  288. EndIf
  289.  
  290. With *this
  291. result = Space(bufferSize)
  292. *r = @result
  293. rLength = 0
  294. Repeat
  295. If (Not \readBlock(@char, SizeOf(Character)) = SizeOf(Character))
  296. Break
  297. EndIf
  298.  
  299. If (char = 0) ;String wurde mit 0 terminiert
  300. Break
  301. ElseIf (char = 10) ;LF wurde gelesen
  302. Break
  303. ElseIf (char = 13) ;CR wurde gelesen, aber vielleicht kommt noch ein LF
  304. last13 = #True
  305. Continue
  306. EndIf
  307. If (last13)
  308. *r\c = 13
  309. *r + SizeOf(Character)
  310. rLength + 1
  311. If (rLength % bufferSize = 0)
  312. result + Space(bufferSize)
  313. *r = @result + rLength * SizeOf(Character)
  314. EndIf
  315. EndIf
  316. last13 = #False
  317.  
  318. *r\c = char
  319. *r + SizeOf(Character)
  320. rLength + 1
  321. If (rLength % bufferSize = 0)
  322. result + Space(bufferSize)
  323. *r = @result + rLength * SizeOf(Character)
  324. EndIf
  325. ForEver
  326. *r\c = 0
  327. EndWith
  328.  
  329. ProcedureReturn result
  330. EndProcedure
  331.  
  332. Procedure.i BufferedStream_bytesAvailable(*this.BufferedStreamS)
  333. Protected bytes.i = 0
  334. With *this
  335. LockMutex(\hMutex)
  336. bytes = \bytesAvailable
  337. UnlockMutex(\hMutex)
  338. EndWith
  339.  
  340. ProcedureReturn bytes
  341. EndProcedure
  342.  
  343. Procedure.i BufferedStream_skip(*this.BufferedStreamS, bytes.i)
  344. If (bytes <= 0)
  345. ProcedureReturn #False
  346. EndIf
  347. With *this
  348. LockMutex(\hMutex)
  349. \skip + bytes
  350. If (\skip > \bytesAvailable)
  351. \skip - \bytesAvailable
  352. \readPos = \writePos
  353. \bytesAvailable = 0
  354. Else
  355. \bytesAvailable - \skip
  356. \skip = 0
  357. \readPos = (\readPos + \skip) % \bufferSize
  358. EndIf
  359. If (\writerWaits)
  360. SignalSemaphore(\hSemaphoreNotFull)
  361. EndIf
  362. UnlockMutex(\hMutex)
  363. EndWith
  364. EndProcedure
  365.  
  366. DataSection
  367. BufferedStream_vTable:
  368. Data.i @BufferedStream_close(), @BufferedStream_isClosed()
  369. Data.i @BufferedStream_writeAscii()
  370. CompilerIf #PB_Compiler_Unicode
  371. Data.i @BufferedStream_writeCharacter()
  372. CompilerElse
  373. Data.i @BufferedStream_writeAscii()
  374. CompilerEndIf
  375. Data.i @BufferedStream_writeBlock()
  376. Data.i @BufferedStream_readAscii(), @BufferedStream_readBlock(), @BufferedStream_readLine()
  377. Data.i @BufferedStream_bytesAvailable(), @BufferedStream_skip()
  378. EndDataSection
  379.  
  380.  
  381. ;=============================================== E X A M P L E ===============================================
  382. CompilerIf Not #PB_Compiler_IsIncludeFile
  383.  
  384. Global a.i = #False
  385.  
  386. ;Schreibt zufällige Buchstaben in den Stream und manchmal einen Zeilenumbruch im Windows-Stil
  387. Procedure FillThread(bs.BufferedStream)
  388. Protected i.i, c.c
  389. For i = 1 To 100
  390. c = Asc(Mid("abcdefghijklmnopqrstuvwxyz ", Random(26) + 1, 1))
  391. If (Random(10) = 0)
  392. bs\WriteCharacter(13)
  393. Debug "FillThread: write CR"
  394. bs\WriteCharacter(10)
  395. Debug "FillThread: write LF"
  396. Else
  397. bs\WriteCharacter(c)
  398. Debug "FillThread: write='" + Chr(c) + "' (" + c + ")"
  399. EndIf
  400. ;Delay aktivieren, wenn man die Ausgaben von ReadLines nicht erst am Schluss sehen will
  401. ;Delay(1)
  402. Next
  403. Debug "FillThread: STREAM CLOSED"
  404. a = #True
  405. EndProcedure
  406.  
  407. ;Lies die Zeilen aus bis der Stream geschlossen wird und keine Bytes mehr da sind.
  408. Procedure ReadLines(bs.BufferedStream)
  409. Protected line.s
  410.  
  411. ;Überspringe die ersten 10 Zeichen beim Auslesen
  412. bs\skip(10 * SizeOf(Character))
  413.  
  414. While (Not bs\isClosed()) Or (bs\bytesAvailable() > 0)
  415. ;Lies eine Zeile aus
  416. line = bs\readLine()
  417. Protected lentotal = lentotal + Len(line) + 2 ; 2 = CRLF
  418. Debug "ReadLines: '" + line + "'"
  419. Wend
  420. Debug lentotal
  421. Debug "ReadLines: END OF STREAM"
  422. EndProcedure
  423.  
  424. ;Stream mit Standard-Puffergröße von 1024 Bytes erstellen
  425. Define bs.BufferedStream = BufferedStream_new()
  426. Define.i t1, t2, t3
  427.  
  428. ;Threads erstellen, die in den Stream schreiben bzw. aus ihm lesen
  429. t1 = CreateThread(@FillThread(), bs)
  430. t2 = CreateThread(@FillThread(), bs)
  431. t3 = CreateThread(@ReadLines(), bs)
  432.  
  433. WaitThread(t1)
  434. Debug "Thread 1 ended."
  435. WaitThread(t2)
  436. Debug "Thread 2 ended."
  437. bs\close()
  438. WaitThread(t3)
  439. Debug "Thread 3 ended."
  440.  
  441. CompilerEndIf
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement