Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- CompilerIf #PB_Compiler_IsMainFile
- EnableExplicit
- CompilerEndIf
- CompilerIf Not #PB_Compiler_Thread
- CompilerError "Must enable thread-safe!"
- CompilerEndIf
- Interface BufferedStream
- close.i() ;Close the stream.
- isClosed.i() ;Tell whether the stream was closed or not.
- writeAscii(ascii.i) ;Write a single Ascii character.
- WriteCharacter(character.i) ;Write a single Character.
- writeBlock(*memory, size.i) ;Write bytes into the stream.
- readAscii.a() ;Read a single Ascii character.
- readBlock.i(*memory, size.i) ;Read bytes into a buffer.
- readLine.s(bufferSize.i = 1024) ;Read a line of text.
- bytesAvailable.i() ;Tell how many bytes are available in the buffer.
- skip.i(bytes.i) ;Skip bytes.
- EndInterface
- Structure BufferedStreamS
- *vTable
- hSemaphoreNotEmpty.i
- hSemaphoreNotFull.i
- readerWaits.i
- writerWaits.i
- hMutex.i
- *buffer
- bufferSize.i
- writePos.i
- readPos.i
- bytesAvailable.i
- closed.i
- skip.i
- EndStructure
- Macro new(Classname)
- Classname#_new()
- EndMacro
- Procedure BufferedStream_new(bufferSize.i = 1024)
- Protected *this.BufferedStreamS = AllocateMemory(SizeOf(BufferedStreamS))
- If (bufferSize < 1)
- bufferSize = 1
- EndIf
- If (Not *this)
- ProcedureReturn #False
- EndIf
- With *this
- \vTable = ?BufferedStream_vTable
- \bufferSize = bufferSize
- \buffer = AllocateMemory(bufferSize, #PB_Memory_NoClear)
- If (Not \buffer)
- FreeMemory(*this)
- ProcedureReturn #False
- EndIf
- \hSemaphoreNotEmpty = CreateSemaphore(0)
- \hSemaphoreNotFull = CreateSemaphore(1)
- \readerWaits = #True
- \writerWaits = #False
- \hMutex = CreateMutex()
- \readPos = 0
- \writePos = 0
- \closed = #False
- \skip = 0
- \bytesAvailable = 0
- EndWith
- ProcedureReturn *this
- EndProcedure
- Procedure BufferedStream_close(*this.BufferedStreamS)
- With *this
- LockMutex(\hMutex)
- \closed = #True
- If (\readerWaits)
- SignalSemaphore(\hSemaphoreNotEmpty)
- EndIf
- UnlockMutex(\hMutex)
- EndWith
- EndProcedure
- Procedure.i BufferedStream_isClosed(*this.BufferedStreamS)
- Protected isClosed.i
- With *this
- LockMutex(\hMutex)
- isClosed = \closed
- UnlockMutex(\hMutex)
- EndWith
- ProcedureReturn isClosed
- EndProcedure
- ; 0123456789
- ;read: ^
- ;write: ^
- Procedure.i BufferedStream_writeAscii(*this.BufferedStreamS, ascii.i)
- With *this
- LockMutex(\hMutex)
- If (\closed)
- UnlockMutex(\hMutex)
- ProcedureReturn #False
- EndIf
- If (\skip > 0)
- \skip - 1
- UnlockMutex(\hMutex)
- ProcedureReturn #True
- EndIf
- While (\bytesAvailable = \bufferSize)
- \writerWaits = #True
- UnlockMutex(\hMutex)
- WaitSemaphore(\hSemaphoreNotFull)
- LockMutex(\hMutex)
- Wend
- \writerWaits = #False
- PokeA(\buffer + \writePos, ascii)
- \writePos = (\writePos + 1) % \bufferSize
- \bytesAvailable + 1
- ;Debug "writeAscii: ascii=" + ascii + " writePos=" + \writePos
- If (\readerWaits)
- SignalSemaphore(\hSemaphoreNotEmpty)
- EndIf
- UnlockMutex(\hMutex)
- EndWith
- ProcedureReturn #True
- EndProcedure
- Procedure.i BufferedStream_writeBlock(*this.BufferedStreamS, *memory.Byte, size.i)
- Protected bytesFree.i, *write.Byte
- With *this
- LockMutex(\hMutex)
- If (\closed)
- UnlockMutex(\hMutex)
- ProcedureReturn #False
- EndIf
- If (\skip > 0)
- If (size > \skip)
- *memory + (size - \skip)
- size - \skip
- Else
- \skip - size
- UnlockMutex(\hMutex)
- ProcedureReturn #True
- EndIf
- EndIf
- *write = \buffer + \writePos
- While size > 0
- While (\bytesAvailable = \bufferSize)
- \writerWaits = #True
- UnlockMutex(\hMutex)
- WaitSemaphore(\hSemaphoreNotFull)
- LockMutex(\hMutex)
- Wend
- \writerWaits = #False
- bytesFree = \bufferSize - \bytesAvailable
- While bytesFree > 0 And size > 0
- *write\b = *memory\b
- *write + 1
- *memory + 1
- bytesFree - 1
- size - 1
- \writePos + 1
- If (\writePos = \bufferSize)
- \writePos = 0
- *write = \buffer
- EndIf
- Wend
- \bytesAvailable = \bufferSize - bytesFree
- If (\readerWaits)
- SignalSemaphore(\hSemaphoreNotEmpty)
- EndIf
- Wend
- UnlockMutex(\hMutex)
- EndWith
- ProcedureReturn #True
- EndProcedure
- CompilerIf #PB_Compiler_Unicode
- Procedure.i BufferedStream_writeCharacter(*this.BufferedStream, character.i)
- ProcedureReturn *this\writeBlock(@character, SizeOf(Character))
- EndProcedure
- CompilerEndIf
- Procedure.a BufferedStream_readAscii(*this.BufferedStreamS)
- Protected ascii.a
- With *this
- LockMutex(\hMutex)
- While (\bytesAvailable = 0)
- If (\closed)
- ;TODO
- UnlockMutex(\hMutex)
- ProcedureReturn #False
- EndIf
- \readerWaits = #True
- UnlockMutex(\hMutex)
- WaitSemaphore(\hSemaphoreNotEmpty)
- LockMutex(\hMutex)
- Wend
- \readerWaits = #False
- ascii = PeekA(\buffer + \readPos)
- \readPos = (\readPos + 1) % \bufferSize
- \bytesAvailable - 1
- If (\writerWaits)
- SignalSemaphore(\hSemaphoreNotFull)
- EndIf
- UnlockMutex(\hMutex)
- EndWith
- ProcedureReturn ascii
- EndProcedure
- Procedure.i BufferedStream_readBlock(*this.BufferedStreamS, *memory.Byte, size.i)
- Protected *read.Byte, initSize.i = size
- With *this
- LockMutex(\hMutex)
- *read = \buffer + \readPos
- While size > 0; And (Not \closed)
- While (\bytesAvailable = 0)
- If (\closed)
- UnlockMutex(\hMutex)
- ProcedureReturn 0
- EndIf
- \readerWaits = #True
- UnlockMutex(\hMutex)
- WaitSemaphore(\hSemaphoreNotEmpty)
- LockMutex(\hMutex)
- Wend
- \readerWaits = #False
- While \bytesAvailable > 0 And size > 0; And (Not \closed)
- *memory\b = *read\b
- *memory + 1
- *read + 1
- \bytesAvailable - 1
- size - 1
- \readPos + 1
- If (\readPos = \bufferSize)
- \readPos = 0
- *read = \buffer
- EndIf
- Wend
- If (\writerWaits)
- SignalSemaphore(\hSemaphoreNotFull)
- EndIf
- Wend
- UnlockMutex(\hMutex)
- EndWith
- ProcedureReturn initSize - size
- EndProcedure
- Procedure.s BufferedStream_readLine(*this.BufferedStream, bufferSize.i = 1024)
- Protected result.s, char.c, *r.Character, rLength.i, last13.c = #False
- If (bufferSize < 1)
- bufferSize = 1
- EndIf
- With *this
- result = Space(bufferSize)
- *r = @result
- rLength = 0
- Repeat
- If (Not \readBlock(@char, SizeOf(Character)) = SizeOf(Character))
- Break
- EndIf
- If (char = 0) ;String wurde mit 0 terminiert
- Break
- ElseIf (char = 10) ;LF wurde gelesen
- Break
- ElseIf (char = 13) ;CR wurde gelesen, aber vielleicht kommt noch ein LF
- last13 = #True
- Continue
- EndIf
- If (last13)
- *r\c = 13
- *r + SizeOf(Character)
- rLength + 1
- If (rLength % bufferSize = 0)
- result + Space(bufferSize)
- *r = @result + rLength * SizeOf(Character)
- EndIf
- EndIf
- last13 = #False
- *r\c = char
- *r + SizeOf(Character)
- rLength + 1
- If (rLength % bufferSize = 0)
- result + Space(bufferSize)
- *r = @result + rLength * SizeOf(Character)
- EndIf
- ForEver
- *r\c = 0
- EndWith
- ProcedureReturn result
- EndProcedure
- Procedure.i BufferedStream_bytesAvailable(*this.BufferedStreamS)
- Protected bytes.i = 0
- With *this
- LockMutex(\hMutex)
- bytes = \bytesAvailable
- UnlockMutex(\hMutex)
- EndWith
- ProcedureReturn bytes
- EndProcedure
- Procedure.i BufferedStream_skip(*this.BufferedStreamS, bytes.i)
- If (bytes <= 0)
- ProcedureReturn #False
- EndIf
- With *this
- LockMutex(\hMutex)
- \skip + bytes
- If (\skip > \bytesAvailable)
- \skip - \bytesAvailable
- \readPos = \writePos
- \bytesAvailable = 0
- Else
- \bytesAvailable - \skip
- \skip = 0
- \readPos = (\readPos + \skip) % \bufferSize
- EndIf
- If (\writerWaits)
- SignalSemaphore(\hSemaphoreNotFull)
- EndIf
- UnlockMutex(\hMutex)
- EndWith
- EndProcedure
- DataSection
- BufferedStream_vTable:
- Data.i @BufferedStream_close(), @BufferedStream_isClosed()
- Data.i @BufferedStream_writeAscii()
- CompilerIf #PB_Compiler_Unicode
- Data.i @BufferedStream_writeCharacter()
- CompilerElse
- Data.i @BufferedStream_writeAscii()
- CompilerEndIf
- Data.i @BufferedStream_writeBlock()
- Data.i @BufferedStream_readAscii(), @BufferedStream_readBlock(), @BufferedStream_readLine()
- Data.i @BufferedStream_bytesAvailable(), @BufferedStream_skip()
- EndDataSection
- ;=============================================== E X A M P L E ===============================================
- CompilerIf Not #PB_Compiler_IsIncludeFile
- Global a.i = #False
- ;Schreibt zufällige Buchstaben in den Stream und manchmal einen Zeilenumbruch im Windows-Stil
- Procedure FillThread(bs.BufferedStream)
- Protected i.i, c.c
- For i = 1 To 100
- c = Asc(Mid("abcdefghijklmnopqrstuvwxyz ", Random(26) + 1, 1))
- If (Random(10) = 0)
- bs\WriteCharacter(13)
- Debug "FillThread: write CR"
- bs\WriteCharacter(10)
- Debug "FillThread: write LF"
- Else
- bs\WriteCharacter(c)
- Debug "FillThread: write='" + Chr(c) + "' (" + c + ")"
- EndIf
- ;Delay aktivieren, wenn man die Ausgaben von ReadLines nicht erst am Schluss sehen will
- ;Delay(1)
- Next
- Debug "FillThread: STREAM CLOSED"
- a = #True
- EndProcedure
- ;Lies die Zeilen aus bis der Stream geschlossen wird und keine Bytes mehr da sind.
- Procedure ReadLines(bs.BufferedStream)
- Protected line.s
- ;Überspringe die ersten 10 Zeichen beim Auslesen
- bs\skip(10 * SizeOf(Character))
- While (Not bs\isClosed()) Or (bs\bytesAvailable() > 0)
- ;Lies eine Zeile aus
- line = bs\readLine()
- Protected lentotal = lentotal + Len(line) + 2 ; 2 = CRLF
- Debug "ReadLines: '" + line + "'"
- Wend
- Debug lentotal
- Debug "ReadLines: END OF STREAM"
- EndProcedure
- ;Stream mit Standard-Puffergröße von 1024 Bytes erstellen
- Define bs.BufferedStream = BufferedStream_new()
- Define.i t1, t2, t3
- ;Threads erstellen, die in den Stream schreiben bzw. aus ihm lesen
- t1 = CreateThread(@FillThread(), bs)
- t2 = CreateThread(@FillThread(), bs)
- t3 = CreateThread(@ReadLines(), bs)
- WaitThread(t1)
- Debug "Thread 1 ended."
- WaitThread(t2)
- Debug "Thread 2 ended."
- bs\close()
- WaitThread(t3)
- Debug "Thread 3 ended."
- CompilerEndIf
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement