using concurrent::ActorPool
using afConcurrent::SynchronizedState
** Provides 'synchronized' multi-thread access to a mutable 'Buf'.
**
** 'SynchronizedBuf' creates a 'Buf' in its own thread and provides access to it via the
** *read* and *write* methods, and complementary 'InStream' and 'OutStream' implementations.
**
** 'SynchronizedBuf' is different to the default 'Buf.toImmutable()' instance because
** 'SynchronizedBuf' is mutable, designed to be constantly written to by one thread / stream,
** and constantly read by anther.
**
** pre>
** .--->--->--- --->--->---
** | Producer | Sync | Consumer |
** ▲ Thread ▼ ---> Buf <--> ▲ Thread ▼
** | Loop | | Loop |
** ---<---<--- ---<---<---
** <pre
**
** Note that 'SynchronizedBuf' grows unbounded. Until, that is, reading catches up with the
** writing; at which point the internal Buf is cleared and reset to initial capacity.
**
** See [Threaded Streams]`http://fantom.org/forum/topic/2586` on the Fantom forum for the initial design.
const class SynchronizedBuf {
private const SynchronizedState threadState
new make(ActorPool actorPool, Int capacity := 1024) {
threadState = SynchronizedState(actorPool) |->Obj| { SynchronizedBufState(capacity) }
}
** Creates and returns a thread safe 'OutStream' wrapper for this 'Buf'.
** 'OutStream' instances should be cached for re-use.
**
** Note you need to call 'flush()' to make data available to the 'InStream'.
OutStream out() {
ThreadedOutStream(this)
}
** Creates and returns a thread safe 'InStream' wrapper for this 'Buf'.
** 'InStream' instances should be cached for re-use.
InStream in() {
ThreadedInStream(this)
}
** Write a byte to the output stream.
**
** This method returns immediately, with the processing happening in the Buf thread.
This write(Int b) {
threadState.async |SynchronizedBufState state| {
state.write(b)
}
return this
}
** Write 'n' bytes from the given 'Buf' at it's current position to the output stream.
** If 'n' is defaulted to 'buf.remaining()', then the entire buffer is drained to the output stream.
**
** This method return immediately, with the processing happening in the Buf thread.
**
** Due to the use of 'Buf.toImmutable()' the given 'Buf' is cleared / invalidated upon return.
This writeBuf(Buf buf, Int n := buf.remaining) {
threadState.async |SynchronizedBufState state| {
state.writeBuf(buf, n)
}
return this
}
** Return the number of bytes available on the input stream without blocking.
** Return zero if no bytes available or unknown.
Int avail() {
threadState.sync |SynchronizedBufState state -> Int| {
state.avail
}
}
** Read the next unsigned byte from the input stream.
** Return 'null' if at end of stream.
Int? read() {
threadState.sync |SynchronizedBufState state -> Int?| {
state.read
}
}
** Attempt to read the next n bytes.
** Note this method may not read the full number of n bytes.
Buf readBuf(Int n) {
threadState.sync |SynchronizedBufState state -> Buf| {
state.readBuf(n)
}
}
** Pushback a byte so that it is the next byte to be read.
** There is a finite limit to the number of bytes which may be pushed back.
Void unread(Int b) {
threadState.sync |SynchronizedBufState state| {
state.unread(b)
}
}
** Attempt to skip 'n' number of bytes. Return the number of bytes
** actually skipped which may be equal to or lesser than 'n'.
Int skip(Int n) {
threadState.sync |SynchronizedBufState state -> Int| {
state.skip(n)
}
}
** Return the total number of bytes in the buffer. This is NOT the same as 'avail()'.
**
** The internal buffer forever expands until the contents have been read, then it is cleared.
Int size() {
threadState.sync |SynchronizedBufState state -> Int| {
state.size
}
}
@NoDoc
override Str toStr() {
threadState.sync |SynchronizedBufState state -> Str| {
"SynchronizedBuf - size=${state.size}, pos=${state.pos}"
}
}
}
internal class SynchronizedBufState {
private Int capactity
private Buf buf := Buf()
new make(Int capactity) {
this.capactity = capactity
}
Void write(Int b) {
pos := buf.pos
buf.seek(buf.size)
buf.out.write(b)
buf.seek(pos)
}
Void writeBuf(Buf b, Int n := b.remaining) {
if (n <= 0) return
pos := buf.pos
buf.seek(buf.size)
buf.out.writeBuf(b, n)
buf.seek(pos)
}
Int avail() {
buf.in.avail
}
Int? read() {
val := buf.in.read
clear
return val
}
Buf readBuf(Int n) {
b := Buf()
buf.in.readBuf(b, n)
clear
return b.toImmutable
}
Void unread(Int b) {
buf.in.unread(b)
}
Int skip(Int n) {
val := buf.in.skip(n)
clear
return val
}
Int pos() {
buf.pos
}
Int size() {
buf.size
}
private Void clear() {
if (avail == 0) {
buf.clear
buf.capacity = capactity
}
}
}
internal class ThreadedOutStream : OutStream {
private Buf buf
private SynchronizedBuf threadBuf
new make(SynchronizedBuf threadBuf) : super.make(null) {
this.buf = Buf()
this.threadBuf = threadBuf
}
** Write a byte to the output stream.
**
** Call 'flush()' to commit data to the main Actor thread.
override This write(Int byte) {
this.buf.write(byte)
return this
}
** Write n bytes from the specified Buf at it's current position to
** the output stream.
**
** Call 'flush()' to commit data to the main Actor thread.
override This writeBuf(Buf buf, Int n := buf.remaining) {
this.buf.writeBuf(buf, n)
return this
}
** Flush the stream so any buffered bytes are written out.
override This flush() {
threadBuf.writeBuf(this.buf.toImmutable)
this.buf.clear
return this
}
** Does nothing and returns true.
override Bool close() {
true
}
}
internal class ThreadedInStream : InStream {
private SynchronizedBuf threadBuf
new make(SynchronizedBuf threadBuf) : super.make(null) {
this.threadBuf = threadBuf
}
** Return the number of bytes available on input stream without
** blocking. Return zero if no bytes available or it is unknown.
override Int avail() {
threadBuf.avail
}
** Read the next unsigned byte from the input stream.
override Int? read() {
threadBuf.read
}
** Attempt to read the next n bytes into the Buf at it's current
** position. The buffer will be grown as needed. Return the number
** of bytes read and increment buf's size and position accordingly.
**
** Note this method may not read the full number of n bytes, use
** `readBufFully` if you must block until all n bytes read.
override Int? readBuf(Buf buf, Int n) {
b := threadBuf.readBuf(n)
s := b.size
buf.writeBuf(b)
return s
}
** Pushback a byte so that it is the next byte to be read. There
** is a finite limit to the number of bytes which may be pushed
** back. Return this.
override This unread(Int b) {
threadBuf.unread(b)
return this
}
** Does nothing and returns true.
override Bool close() {
true
}
** Attempt to skip 'n' number of bytes. Return the number of bytes
** actually skipped which may be equal to or lesser than n.
override Int skip(Int n) {
threadBuf.skip(n)
}
}