sourceafMongo::MongoCur.fan

using afBson::BsonIO

** Mongo Cursors iterate over Collection data.
** 
** @See
**  - `https://www.mongodb.com/docs/manual/reference/command/getMore/`
**  - `https://www.mongodb.com/docs/manual/reference/command/killCursors/`
**  - `https://github.com/mongodb/specifications/blob/master/source/find_getmore_killcursors_commands.rst`
class MongoCur {
    
    ** The underlying connection manager.
    const MongoConnMgr  connMgr
    
    ** The database this cursor iterates in.
    const Str           dbName
    
    ** The collection this cursor iterates over.
    const Str           colName
    
    ** The cursor ID.
    Int                 cursorId {
        private set
    }
    
    ** A configurable batch size.
    ** 
    ** 'null' indicates a Mongo default of '101'.
    Int?                batchSize {
        set {
            if (it == 0) it = null
            if (it != null && it < 0) it = it.abs
            &batchSize = it
        }
    }
    
    ** A configurable timeout for Mongo server operations.
    ** 
    ** 'null' indicates a Mongo default of forever.
    Duration?           maxTime {
        set {
            if (it == 0ms) it = null
            if (it != null && it < 0ms) it = it.abs
            &maxTime = it
        }
    }
    
    private [Str:Obj?][]?   _batch
    private Int             _batchIndex
    private Int             _totalIndex
    private MongoSess?      _session

    ** Creates a new Mongo Cursor with a first batch
    new make(MongoConnMgr connMgr, Str dbName, Str colName, Int cursorId, [Str:Obj?][]? firstBatch := null, Obj? session := null) {
        this.connMgr        = connMgr
        this.dbName         = dbName
        this.colName        = colName
        this.cursorId       = cursorId
        this._batch         = firstBatch
        this._session       = session
        this._totalIndex    = -1
    }
    
    ** Returns the next document from the cursor, or 'null'.
    ** 
    ** The cursor must be manually killed.
    ** pre>
    ** syntax: fantom
    ** 
    ** while (cursor.isAlive) {
    **     doc := cursor.next
    **     ...
    ** }
    ** cursor.kill
    ** <pre
    [Str:Obj?]? next() {
        if (isAlive == false)
            return null
        
        if (_isExhausted) {
            cur := MongoCmd(connMgr, dbName, "getMore", cursorId, _session)
                .add("collection",  colName)
                .add("batchSize",   batchSize)
                .add("maxTimeMS",   maxTime?.toMillis)
                .run["cursor"] as Str:Obj?
            
            this.cursorId   = cur["id"]
            this._batch     = cur["nextBatch"]
            this._batchIndex= 0
            
            if (this.cursorId == 0) {
                this._session?.endSession
                this._session = null
            }

            // very occasionally, we get an "IndexErr: 0" when returning _batch data,
            // presumably after getMore() returns 0 documents
            if (isAlive == false)
                return null
        }
        
        _totalIndex++
        return _batch[_batchIndex++]
    }
    
    ** Returns the current index of the last document.
    Int index() {
        _totalIndex
    }
    
    ** Kills this cursor.
    ** 
    ** No more documents will be returned from 'next()', 'each()', or 'toList()'.
    Void kill() {
        if (cursorId == 0)
            return
        
        res := MongoCmd(connMgr, dbName, "killCursors", colName, _session)
            .add("cursors", [cursorId])
            .run
        
        // there is no coming back from a Kill Cmd!
        this.cursorId       = 0
        this._batchIndex    = _batch.size
        
        // double check and log a warning if something seems afoot
        killed := res["cursorsKilled"] as Int[]
        if (killed.contains(cursorId) == false)
            connMgr.log.warn("Cursor (${cursorId}) not killed. Mongo says:\n" + BsonIO().print(res))
        
        this._session?.endSession
        this._session = null
    }

    ** Iterates over all *remaining* and unread documents.
    ** 
    ** This cursor is guaranteed to be killed.
    ** 
    ** pre>
    ** syntax: fantom
    ** 
    ** cursor.each |Str:Obj? doc, Int index| {
    **     ...
    ** }
    ** <pre
    Void each(|Str:Obj? doc, Int index| fn) {
        try while (isAlive) {
            doc := next
            if (doc != null)
                fn(doc, _totalIndex)
        }
        finally kill
    }

    ** Converts all *remaining* and unread documents to a List.
    ** The new list is typed based on the return type of the function.
    ** 
    ** This cursor is guaranteed to be killed.
    ** 
    ** pre>
    ** syntax: fantom
    ** 
    ** list := cursor.map |Str:Obj? doc, Int index -> Obj?| {
    **     ...
    ** }
    ** <pre
    Obj?[] map(|Str:Obj? doc, Int index->Obj?| fn) {
        type := fn.returns == Void# ? Obj?# : fn.returns
        list := List.make(type, 16)
        try while (isAlive) {
            doc := next
            if (doc != null) {
                obj := fn(doc, _totalIndex)
                list.add(obj)
            }
        }
        finally kill
        return list
    }

    ** Return all *remaining* and unread documents as a List.
    ** 
    ** This cursor is guaranteed to be killed.
    ** 
    ** Returns an empty list should the cursor be killed.
    [Str:Obj?][] toList() {
        docs := Str:Obj?[][,]
        docs.capacity = _batch.size // it's a good starting point!
        try while (isAlive) {
            doc := next
            if (doc != null)
                docs.add(doc)
        }
        finally kill
        return  docs
    }
    
    ** Returns 'true' if the cursor is alive on the server or more documents may be read.
    Bool isAlive() {
        isDead := _isExhausted && cursorId == 0
        return isDead == false
    }

    private Bool _isExhausted() {
        _batchIndex >= _batch.size
    }

    @NoDoc
    override Str toStr() { cursorId.toStr }
}