sourceredis::RedisClient.fan

//
// Copyright (c) 2023, Novant LLC
// Licensed under the MIT License
//
// History:
//   8 Jan 2023  Andy Frank  Creation
//

using concurrent
using inet

**************************************************************************
** RedisClient
**************************************************************************

** Redis client.
const class RedisClient
{

//////////////////////////////////////////////////////////////////////////
// Lifecycle
//////////////////////////////////////////////////////////////////////////

  ** Create a new client instance for given host and port.
  new make(Str host, Int port := 6379)
  {
    this.host = host
    this.port = port
  }

  ** Host name of Redis server.
  const Str host

  ** Port number of Redis server.
  const Int port

  ** Close this client all connections if applicable.
  Void close()
  {
    actor.pool.stop.join
  }

  ** Log instance.
  internal const Log log := Log("redis", false)

//////////////////////////////////////////////////////////////////////////
// Base API
//////////////////////////////////////////////////////////////////////////

  ** Get the value for given key.
  Str? get(Str key)
  {
    invoke(["GET", key])
  }

  ** Set the given key to value, if 'val' is null this method deletes
  ** the given key (see `del`). If 'px' is non-null expire this key
  ** after the given timeout in milliseconds.
  Void set(Str key, Obj? val, Duration? px := null)
  {
    // delete if key 'null'
    if (val == null) return del(key)

    // else set
    req := ["SET", key, val]
    if (px != null) req.add("PX").add(toMillis(px))
    invoke(req)
  }

  ** Set the given key to value only if key does not exist. Returns
  ** 'true' if set was succesfull, or false if set failed due to
  ** already existing key.  If 'px' is non-null expire this key after
  ** the given timeout in milliseconds.
  Bool setnx(Str key, Obj val, Duration? px := null)
  {
    req := ["SET", key, val, "NX"]
    if (px != null) req.add("PX").add(toMillis(px))
    return invoke(req) != null
  }

  ** Delete the given key value.
  Void del(Str key)
  {
    invoke(["DEL", key])
  }

  ** Invoke the given command and return response.
  Obj? invoke(Obj[] args)
  {
    // NOTE: we use unsafe on returns since we can guaretee the
    // reference is not touched again; we also use Unsafe for
    // args for performance to avoid serialization; and in _most_
    // cases this should be fine; but it does create an edge case

    Unsafe u := actor.send(RMsg('v', args)).get
    return u.val
  }

  ** Pipeline multiple `invoke` requests and return batched results.
  Obj?[] pipeline(Obj[] invokes)
  {
    // NOTE: we use unsafe on returns since we can guaretee the
    // reference is not touched again; we also use Unsafe for
    // args for performance to avoid serialization; and in _most_
    // cases this should be fine; but it does create an edge case

    Unsafe u := actor.send(RMsg('p', invokes)).get
    return u.val
  }

//////////////////////////////////////////////////////////////////////////
// Expire
//////////////////////////////////////////////////////////////////////////

  ** Expire given key after given 'seconds' has elasped, where
  ** timeout must be in even second intervals.
  Void expire(Str key, Duration seconds)
  {
    sec := toSec(seconds)
    invoke(["EXPIRE", key, sec])
  }

  ** Expire given key when the given 'timestamp' has been reached,
  ** where 'timestamp' has a resolution of whole seconds.
  Void expireat(Str key, DateTime timestamp)
  {
    unix := timestamp.toJava / 1000
    invoke(["EXPIREAT", key, unix])
  }

  ** Expire given key after given 'ms' has elasped, where
  ** timeout must be in even millisecond intervals.
  Void pexpire(Str key, Duration milliseconds)
  {
    ms := toMillis(milliseconds)
    invoke(["PEXPIRE", key, ms])
  }

//////////////////////////////////////////////////////////////////////////
// Incr
//////////////////////////////////////////////////////////////////////////

  ** Increments the number stored at key by one. If the key does
  ** not exist, it is set to 0 before performing the operation.
  ** If 'px' is non-null expire this key after the given timeout
  ** in milliseconds. Returns the value of the key after the increment.
  Int incr(Str key, Duration? px := null)
  {
    if (px == null)
    {
      return invoke(["INCR", key])
    }
    else
    {
      // TODO FIXIT: use MUTLI transaction
      return pipeline([
        ["INCR", key],
        ["PEXPIRE", key, toMillis(px)],
      ]).first
    }
  }

  ** Increments the number stored at key by 'delta'. If the key
  ** does not exist, it is set to 0 before performing the operation.
  ** If 'px' is non-null expire this key after the given timeout
  ** in milliseconds. Returns the value of the key after the increment.
  Int incrby(Str key, Int delta, Duration? px := null)
  {
    if (px == null)
    {
      return invoke(["INCRBY", key, delta])
    }
    else
    {
      // TODO FIXIT: use MUTLI transaction
      return pipeline([
        ["INCRBY", key, delta],
        ["PEXPIRE", key, toMillis(px)],
      ]).first
    }
  }

  ** Increment the string representing a floating point number
  ** stored at 'key' by the specified 'delta'. If the key does not
  ** exist, it is set to 0 before performing the operation. If 'px'
  ** is non-null expire this key after the given timeout in
  ** milliseconds. Returns the value of the key after the increment.
  Float incrbyfloat(Str key, Float delta, Duration? px := null)
  {
    if (px == null)
    {
      Str res := invoke(["INCRBYFLOAT", key, delta])
      return res.toFloat
    }
    else
    {
      // TODO FIXIT: use MUTLI transaction
      Str res := pipeline([
        ["INCRBYFLOAT", key, delta],
        ["PEXPIRE", key, toMillis(px)],
      ]).first
      return res.toFloat
    }
  }

//////////////////////////////////////////////////////////////////////////
// Hash
//////////////////////////////////////////////////////////////////////////

  ** Get the hash field for given key.
  Str? hget(Str key, Str field)
  {
    invoke(["HGET", key, field])
  }

  ** Get the hash field for given key.
  Str?[] hmget(Str key, Str[] fields)
  {
    invoke(["HMGET", key].addAll(fields))
  }

  ** Get all hash field values for given key.
  Str:Str hgetall(Str key)
  {
    map := Str:Str[:]
    List acc := invoke(["HGETALL", key])

    for (i:=0; i<acc.size; i+=2)
    {
      k := acc[i]
      v := acc[i+1]
      map[k] = v
    }

    return map
  }

  ** Set the hash field to the given value for key.
  Void hset(Str key, Str field, Obj val)
  {
    invoke(["HSET", key, field, val])
  }

  ** Set all hash values in 'vals' for given key.
  Void hmset(Str key, Str:Obj vals)
  {
    acc := Obj["HMSET", key]
    vals.each |v,k| { acc.add(k).add(v) }
    invoke(acc)
  }

  ** Delete given hash field for key.
  Void hdel(Str key, Str field)
  {
    invoke(["HDEL", key, field])
  }

  ** Convenience for 'hincrby(key, field 1)'
  Int hincr(Str key, Str field)
  {
    hincrby(key, field, 1)
  }

  ** Increments the number stored at 'field' in the hash stored at
  ** 'key' by given 'delta'. If the field does not exist, it is set
  ** to 0 before performing the operation.
  Int hincrby(Str key, Str field, Int delta)
  {
    invoke(["HINCRBY", key, field, delta])
  }

  ** Increment the string representing a floating point number stored
  ** at 'field' in the hash stored at 'key' by the specified 'delta'.
  ** If the key does not exist, it is set to 0 before performing the
  ** operation.
  Float hincrbyfloat(Str key, Str field, Float delta)
  {
    Str res := invoke(["HINCRBYFLOAT", key, field, delta])
    return res.toFloat
  }

//////////////////////////////////////////////////////////////////////////
// Misc API
//////////////////////////////////////////////////////////////////////////

  ** Returns information about the memory usage of server.
  Str:Obj memStats()
  {
    List acc := invoke(["MEMORY", "STATS"])
    map := Str:Obj[:] { it.ordered=true }
    for (i:=0; i<acc.size; i+=2)
    {
      k := acc[i]
      v := acc[i+1]
      map[k] = v
    }
    return map
  }

//////////////////////////////////////////////////////////////////////////
// Support
//////////////////////////////////////////////////////////////////////////

  ** Convert duration to even millis or throw if < 1ms
  private Int toMillis(Duration d)
  {
    ms := d.toMillis
    if (ms < 1) throw ArgErr("Non-zero timeout in milliseconds required")
    return ms
  }

  ** Convert duration to even millis or throw if < 1ms
  private Int toSec(Duration d)
  {
    sec := d.toSec
    if (sec < 1) throw ArgErr("Non-zero timeout in seconds required")
    return sec
  }

//////////////////////////////////////////////////////////////////////////
// Actor
//////////////////////////////////////////////////////////////////////////

  // Actor
  private const ActorPool pool := ActorPool { name="RedisClient" }
  private const Actor actor := Actor(pool) |msg|
  {
    RedisConn? c
    try
    {
      c = Actor.locals["c"]
      if (c == null) Actor.locals["c"] = c = RedisConn(host, port)

      RMsg m := msg
      switch (m.op)
      {
        case 'v': return Unsafe(c.invoke(m.a))
        case 'p': return Unsafe(c.pipeline(m.a))
        default: throw ArgErr("Unknown op '${m.op.toChar}'")
      }
    }
    catch (Err err)
    {
      // TODO: this could be smarter; only teardown for network errs?
      log.err("Unexpected error", err)
      c?.close
      Actor.locals["c"] = null
      throw err
    }
    return null
  }
}

**************************************************************************
** RMsg
**************************************************************************

internal const class RMsg
{
  new make(Int op, Obj? a := null)
  {
    this.op = op
    this.ua = Unsafe(a)
  }

  const Int op
  Obj? a() { ua.val }

  private const Unsafe? ua
}