local http = require("socket.http") local url = require("socket.url") local ltn12 = require("ltn12") local socket = require("socket") local socketutil = require("socketutil") local rapidjson = require("rapidjson") local XRPCClient = { pds_url = nil, handle = nil, app_password = nil, access_token = nil, refresh_token = nil, timeout = 30, on_credentials_updated = nil, -- Callback for when credentials are refreshed } function XRPCClient:new(o) o = o or {} setmetatable(o, self) self.__index = self return o end function XRPCClient:setPDS(pds_url) self.pds_url = pds_url end function XRPCClient:setAuth(access_token, refresh_token) self.access_token = access_token if refresh_token then self.refresh_token = refresh_token end end function XRPCClient:setCredentials(handle, app_password) self.handle = handle self.app_password = app_password end function XRPCClient:setCredentialsCallback(callback) self.on_credentials_updated = callback end -- Notify that credentials were updated function XRPCClient:notifyCredentialsUpdated(access_token, refresh_token) if self.on_credentials_updated then self.on_credentials_updated(access_token, refresh_token) end end function XRPCClient:call(opts) opts = opts or {} local method = opts.method local params = opts.params local body = opts.body local skip_renewal = opts.skip_renewal or false local pds = opts.pds or self.pds_url if not pds then return nil, "PDS URL not set" end local endpoint = pds .. "/xrpc/" .. method -- Add query parameters if provided if params and next(params) then local query_parts = {} for k, v in pairs(params) do table.insert(query_parts, k .. "=" .. url.escape(tostring(v))) end endpoint = endpoint .. "?" .. table.concat(query_parts, "&") end local request_body = nil local source = nil local is_post = body ~= nil local headers = { ["Accept"] = "application/json", } -- Handle request body (POST only) if is_post then headers["Content-Type"] = "application/json" -- Add authorization header for POST requests if token is set if self.access_token then headers["Authorization"] = "Bearer " .. self.access_token end local encoded, err = rapidjson.encode(body) if err then return nil, "Failed to encode JSON: " .. err end request_body = encoded headers["Content-Length"] = tostring(#request_body) source = ltn12.source.string(request_body) end local sink = {} local request = { url = endpoint, method = is_post and "POST" or "GET", sink = ltn12.sink.table(sink), source = source, headers = headers, } socketutil:set_timeout(self.timeout, self.timeout) local code, response_headers = socket.skip(1, http.request(request)) socketutil:reset_timeout() if not code then return nil, "HTTP request failed: " .. tostring(response_headers) end local response_body = table.concat(sink) -- Handle authentication errors with automatic session renewal local is_auth_error = code == 401 or (code == 400 and response_body and string.find(string.lower(response_body), "expired")) if is_auth_error and not skip_renewal then -- Try to renew session (refresh token -> app password fallback) local renewed, renew_err = self:renewSession() if renewed then -- Session renewed successfully, retry the original request -- Rebuild request with new authorization header if is_post and self.access_token then headers["Authorization"] = "Bearer " .. self.access_token end sink = {} request.sink = ltn12.sink.table(sink) if is_post then request.source = ltn12.source.string(request_body) end request.headers = headers socketutil:set_timeout(self.timeout, self.timeout) code, response_headers = socket.skip(1, http.request(request)) socketutil:reset_timeout() if not code then return nil, "HTTP request failed after session renewal: " .. tostring(response_headers) end response_body = table.concat(sink) else -- Could not renew session return nil, "Session expired and renewal failed: " .. (renew_err or "unknown error") end end -- Handle non-200 responses if code ~= 200 then local error_msg if response_body and response_body ~= "" then local error_data = rapidjson.decode(response_body) if error_data and error_data.message then error_msg = error_data.message else error_msg = response_body end else error_msg = "HTTP " .. code end return nil, error_msg end -- Decode response if response_body and response_body ~= "" then local decoded, err = rapidjson.decode(response_body) if err then return nil, "Failed to decode JSON response: " .. err end return decoded, nil end return {}, nil end -- Resolve PDS from handle using Slingshot service function XRPCClient:resolvePDS(handle) local response, err = self:call({ method = "com.bad-example.identity.resolveMiniDoc", params = { identifier = handle }, pds = "https://slingshot.microcosm.blue", }) if err or not response then return nil, "Failed to resolve PDS for handle: " .. err end if not response.pds then return nil, "Invalid response from Slingshot service" end return response.pds, nil end -- Create session (login) function XRPCClient:createSession(identifier, password) local response, err = self:call({ method = "com.atproto.server.createSession", body = { identifier = identifier, password = password, }, skip_renewal = true, -- Don't auto-renew during initial login }) if err or not response then return nil, err end if response.accessJwt then self:setAuth(response.accessJwt, response.refreshJwt) -- Note: We don't call notifyCredentialsUpdated here since the caller -- of createSession is expected to handle storing credentials end return response, nil end -- Validate current session by making a test request function XRPCClient:validateSession() if not self.access_token then return false, "No access token" end -- Make a lightweight request to check if session is valid local response, err = self:call({ method = "com.atproto.server.getSession", body = {}, skip_renewal = true, -- Don't auto-renew during validation }) return response ~= nil, err end -- Refresh session using refresh token function XRPCClient:refreshSession() if not self.refresh_token then return nil, "No refresh token available" end -- Temporarily store the current access token local old_access_token = self.access_token -- Use refresh token for this request self.access_token = self.refresh_token local response, err = self:call({ method = "com.atproto.server.refreshSession", body = {}, skip_renewal = true, -- Don't recurse during refresh }) if err or not response then -- Restore old access token on failure self.access_token = old_access_token return nil, err end -- Update tokens with new values if response.accessJwt then self:setAuth(response.accessJwt, response.refreshJwt) self:notifyCredentialsUpdated(response.accessJwt, response.refreshJwt) end return response, nil end -- Renew session with multi-level fallback -- 1. Try to use refresh token -- 2. If refresh token expired, create new session with app password function XRPCClient:renewSession() -- First, try to refresh with refresh token local response, err = self:refreshSession() if response then return true, nil end -- If refresh failed and we have app password, try to create new session if self.handle and self.app_password then local session, session_err = self:createSession(self.handle, self.app_password) if session then -- New session created successfully, tokens already set by createSession self:notifyCredentialsUpdated(session.accessJwt, session.refreshJwt) return true, nil end return false, "Failed to refresh session and create new session: " .. (session_err or "unknown error") end return false, "Failed to refresh session: " .. (err or "unknown error") .. ", and no app password available for fallback" end -- List records from a collection function XRPCClient:listRecords(repo, collection, limit, cursor) local params = { repo = repo, collection = collection, } if limit then params.limit = limit end if cursor then params.cursor = cursor end return self:call({ method = "com.atproto.repo.listRecords", params = params, }) end -- Get a single record function XRPCClient:getRecord(repo, collection, rkey) local params = { repo = repo, collection = collection, rkey = rkey, } return self:call({ method = "com.atproto.repo.getRecord", params = params, }) end -- Put (create or update) a record function XRPCClient:putRecord(repo, collection, rkey, record) return self:call({ method = "com.atproto.repo.putRecord", body = { repo = repo, collection = collection, rkey = rkey, record = record, }, }) end return XRPCClient