using concurrent::Actorusing concurrent::ActorPoolusing concurrent::AtomicRefusing afConcurrent::SynchronizedListusing afConcurrent::SynchronizedStateusing inet::TcpSocketinternalconstclass VideoReader {privateconst Log log := Drone#.pod.logprivateconst Drone droneprivateconst ActorPool actorPoolprivateconst AtomicRef frameListenerRef := AtomicRef()privateconst AtomicRef errorListenerRef := AtomicRef()privateconst SynchronizedState mutexprivateconst Int portnew make(Drone drone, ActorPool actorPool, Int port){this.drone = dronethis.actorPool = actorPoolthis.port = portthis.mutex = SynchronizedState(actorPool) |->Obj?| { VideoReaderImpl(drone.networkConfig, port)}}** Only internal listeners should be added here. Void setFrameListener(|Buf, PaveHeader| f){ frameListenerRef.val = f} Void setErrorListener(|Err| f){ errorListenerRef.val = f} Void connect(){ mutex.getState |VideoReaderImpl reader| { reader.connectif(reader.isConnected && !actorPool.isStopped) readVidData}} Bool isConnected(){ mutex.getState |VideoReaderImpl reader->Bool| { reader.isConnected}} Void disconnect(){if(!mutex.lock.actor.pool.isStopped)try mutex.getState |VideoReaderImpl reader| { reader.disconnect}catch{/* meh */} frameListenerRef.val = null errorListenerRef.val = null}private Void readVidData(){ mutex.withState |VideoReaderImpl reader| { doReadVidData(reader)if(reader.isConnected && !actorPool.isStopped) readVidData}}private Void doReadVidData(VideoReaderImpl reader){ pave := nullas PaveHeadertry pave = reader.receivecatch(IOErr err)// drone.isConnected is set to false *before* we stop the ActorPoolif(reader.isConnected && drone.isConnected)((|Err|?) errorListenerRef.val)?.call(err)catch(Err err)((|Err|?) errorListenerRef.val)?.call(err)// call internal listenersif(pave != null)((|Buf, PaveHeader|?) frameListenerRef.val)?.call(pave.payload, pave) pave = null}}internalclass VideoReaderImpl {const Log log := Drone#.pod.logconst Int portconst NetworkConfig config TcpSocket? socketnew make(NetworkConfig config, Int port){this.config = configthis.port = port} Void connect(){if(isConnected) disconnectthis.socket = TcpSocket {it.options.receiveTimeout = config.tcpReceiveTimeout}.connect(config.droneIpAddr, port, config.actionTimeout)} Bool isConnected(){ socket != null && socket.isConnected} Void disconnect(){ socket?.close socket = null} PaveHeader? receive(){if(socket == null)returnnull in := socket.in { endian = Endian.little }// may need to wait until we have all the header data -> readBufFully() ??return PaveHeader { signature = in.readChars(4) version = uint8(in) videoCodec = uint8(in) headerSize = uint16(in) payloadSize = uint32(in) encodedStreamWidth = uint16(in) encodedStreamHeight = uint16(in) displayWidth = uint16(in) displayHeight = uint16(in) frameNumber = uint32(in) timestamp = 1ms * uint32(in) totalChunks = uint8(in) chunkIndex = uint8(in) frameType = uint8(in) control = uint8(in) streamBytePosition = uint32(in) + (uint32(in).shiftl(32)) streamId = uint16(in) totalSlices = uint8(in) sliceIndex = uint8(in) header1Size = uint8(in) header2Size = uint8(in) in.skip(2) advertisedSize = uint32(in) in.skip(12)if(signature != "PaVE")// meh - lets carry on regardless log.warn("Invalid PaVE signature: ${signature}")// stupid kludge for https://projects.ardrone.org/issues/show/159 in.skip(headerSize - 64) payload = in.readBufFully(null, payloadSize)}}privatestatic Bool bool (InStream in){ in.readU1 != 0 }privatestatic Int uint8 (InStream in){ in.readU1 }privatestatic Int uint16 (InStream in){ in.readU2 }privatestatic Int uint32 (InStream in){ in.readU4 }privatestatic Int int16 (InStream in){ in.readS2 }privatestatic Int int32 (InStream in){ in.readS4 }privatestatic Int int64 (InStream in){ in.readS8 }privatestatic Float float32 (InStream in){ Float.makeBits32(in.readU4)}privatestatic Float double64(InStream in){ Float.makeBits(in.readS8)}}** Parrot Video Encapsulation (PaVE) headers for video frame data.** Passed to the [drone.onVideoFrame()]`Drone.onVideoFrame` event hook. constclass PaveHeader {** "PaVE" - used to identify the start of frameconst Str signature** Version codeconst Int version** Codec of the following frameconst Int videoCodec** Size of the parrot_video_encapsulation_tconst Int headerSize** Amount of data following this PaVEconst Int payloadSize** Example: 640const Int encodedStreamWidth** Example: 368const Int encodedStreamHeight** Example: 640const Int displayWidth** Example: 360const Int displayHeight** Frame position inside the current streamconst Int frameNumber** In millisecondsconst Duration timestamp** Number of UDP packets containing the current decodable payload - currently unusedconst Int totalChunks** Position of the packet - first chunk is #0 - currenty unusedconst Int chunkIndex** I-frame, P-frame - parrot_video_encapsulation_frametypes_tconst Int frameType** Special commands like end-of-stream or advertised framesconst Int control** Byte position of the current payload in the encoded streamconst Int streamBytePosition** This ID indentifies packets that should be recorded togetherconst Int streamId** Number of slices composing the current frameconst Int totalSlices** Position of the current slice in the frameconst Int sliceIndex** H.264 only : size of SPS inside payload - no SPS present if value is zeroconst Int header1Size** H.264 only : size of PPS inside payload - no PPS present if value is zeroconst Int header2Size** Size of frames announced as advertised framesconst Int advertisedSize ** The raw video frame data const Buf payload @NoDocnew make(|This| f){ f(this)}}