using concurrent::Actorusing concurrent::ActorPoolusing afConcurrent::SynchronizedListusing afConcurrent::SynchronizedStateusing inet::TcpSocketinternalconstclass VideoReader {privateconst Log log := Drone#.pod.logprivateconst ActorPool actorPoolprivateconst SynchronizedList listenersprivateconst SynchronizedState mutexnew make(Drone drone, Int port, ActorPool actorPool){this.actorPool = actorPoolthis.listeners = SynchronizedList(actorPool){it.valType = |Buf, PaveHeader|# }this.mutex = SynchronizedState(actorPool) |->Obj?| { VideoReaderImpl(drone, port)}}** Only internal listeners should be added here. Void addListener(|Buf, PaveHeader| f){ listeners.add(f)} Void removeListener(|Buf, PaveHeader| f){ listeners.remove(f)} Void connect(){ mutex.withState |VideoReaderImpl reader| { reader.connectif(reader.isConnected && !actorPool.isStopped) readVidData}} Bool isConnected(){ mutex.getState |VideoReaderImpl reader->Bool| { reader.isConnected}} Void disconnect(){ mutex.getState |VideoReaderImpl reader| { reader.disconnect}}private Void readVidData(){ mutex.withState |VideoReaderImpl reader| { doReadVidData(reader)if(reader.isConnected && !actorPool.isStopped) readVidData}}private Void doReadVidData(VideoReaderImpl reader){ pave := reader.receiveif(pave != null){// call internal listeners listeners.each {try((|Buf, PaveHeader|)it).call(pave.payload, pave)catch(Err err) err.trace}} pave = null}}internalclass VideoReaderImpl {const Log log := Drone#.pod.logconst Int portconst Drone droneconst NetworkConfig config TcpSocket? socketnew make(Drone drone, Int port){this.drone = dronethis.port = portthis.config = drone.networkConfig} Void connect(){if(isConnected) disconnectthis.socket = TcpSocket {it.options.receiveTimeout = config.tcpReceiveTimeout}.connect(config.droneIpAddr, port, config.actionTimeout)} Bool isConnected(){ socket != null && socket.isConnected} Void disconnect(){ socket?.close socket = null} PaveHeader? receive(){ in := socket.in { endian = Endian.little }try// may need to wait until we have all the header data -> readBufFully() ??return PaveHeader { signature = in.readChars(4) version = uint8(in) videoCodec = uint8(in) headerSize = uint16(in) payloadSize = uint32(in) encodedStreamWidth = uint16(in) encodedStreamHeight = uint16(in) displayWidth = uint16(in) displayHeight = uint16(in) frameNumber = uint32(in) timestamp = 1ms * uint32(in) totalChunks = uint8(in) chunkIndex = uint8(in) frameType = uint8(in) control = uint8(in) streamBytePosition = uint32(in) + (uint32(in).shiftl(32)) streamId = uint16(in) totalSlices = uint8(in) sliceIndex = uint8(in) header1Size = uint8(in) header2Size = uint8(in) in.skip(2) advertisedSize = uint32(in) in.skip(12)if(signature != "PaVE")// meh - lets carry on regardless log.warn("Invalid PaVE signature: ${signature}")// stupid kludge for https://projects.ardrone.org/issues/show/159 in.skip(headerSize - 64) payload = in.readBufFully(null, payloadSize)}catch(Err err){// log err as this could mess up our position in the stream// TODO maybe auto-disconnect / re-connect to re-establish stream position? log.err("Could not decode Video data - $err.msg")returnnull}}privatestatic Bool bool (InStream in){ in.readU1 != 0 }privatestatic Int uint8 (InStream in){ in.readU1 }privatestatic Int uint16 (InStream in){ in.readU2 }privatestatic Int uint32 (InStream in){ in.readU4 }privatestatic Int int16 (InStream in){ in.readS2 }privatestatic Int int32 (InStream in){ in.readS4 }privatestatic Int int64 (InStream in){ in.readS8 }privatestatic Float float32 (InStream in){ Float.makeBits32(in.readU4)}privatestatic Float double64(InStream in){ Float.makeBits(in.readS8)}}** Parrot Video Encapsulation (PaVE) headers for video frame data.** Passed to the [drone.onVideoFrame()]`Drone.onVideoFrame` event hook. constclass PaveHeader {** "PaVE" - used to identify the start of frameconst Str signature** Version codeconst Int version** Codec of the following frameconst Int videoCodec** Size of the parrot_video_encapsulation_tconst Int headerSize** Amount of data following this PaVEconst Int payloadSize** Example: 640const Int encodedStreamWidth** Example: 368const Int encodedStreamHeight** Example: 640const Int displayWidth** Example: 360const Int displayHeight** Frame position inside the current streamconst Int frameNumber** In millisecondsconst Duration timestamp** Number of UDP packets containing the current decodable payload - currently unusedconst Int totalChunks** Position of the packet - first chunk is #0 - currenty unusedconst Int chunkIndex** I-frame, P-frame - parrot_video_encapsulation_frametypes_tconst Int frameType** Special commands like end-of-stream or advertised framesconst Int control** Byte position of the current payload in the encoded streamconst Int streamBytePosition** This ID indentifies packets that should be recorded togetherconst Int streamId** Number of slices composing the current frameconst Int totalSlices** Position of the current slice in the frameconst Int sliceIndex** H.264 only : size of SPS inside payload - no SPS present if value is zeroconst Int header1Size** H.264 only : size of PPS inside payload - no PPS present if value is zeroconst Int header2Size** Size of frames announced as advertised framesconst Int advertisedSize ** The raw video frame data const Buf payload @NoDocnew make(|This| f){ f(this)}}