this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Port client to Zig 0.16 Io networking

zzstoatzz 1377fe24 7fdc8212

+63 -32
+19 -13
example.zig
··· 7 7 const redis = @import("redis"); 8 8 9 9 pub fn main() !void { 10 - var gpa = std.heap.GeneralPurposeAllocator(.{}){}; 10 + var gpa: std.heap.DebugAllocator(.{}) = .init; 11 11 defer _ = gpa.deinit(); 12 12 const allocator = gpa.allocator(); 13 13 14 14 // Connect to Redis 15 - var client = redis.Client.connect(allocator, "localhost", 6379) catch |err| { 15 + var client = redis.Client.connect(std.Options.debug_io, allocator, "localhost", 6379) catch |err| { 16 16 std.debug.print("Failed to connect: {}\n", .{err}); 17 17 return; 18 18 }; ··· 78 78 var hash = client.hashes(); 79 79 80 80 // HSET multiple fields 81 - _ = hash.hset("user:1", &.{ 81 + _ = hash.hsetMulti("user:1", &.{ 82 82 .{ "name", "alice" }, 83 83 .{ "email", "alice@example.com" }, 84 84 .{ "score", "100" }, ··· 110 110 var list = client.lists(); 111 111 112 112 // Clear and populate list 113 - _ = client.keys().del(&.{"tasks"}) catch {}; 113 + var keys = client.keys(); 114 + _ = keys.del(&.{"tasks"}) catch {}; 114 115 _ = list.rpush("tasks", &.{ "task1", "task2", "task3" }) catch return; 115 116 116 117 // LRANGE ··· 165 166 var zset = client.sortedSets(); 166 167 167 168 // Clear and add scores 168 - _ = client.keys().del(&.{"leaderboard"}) catch {}; 169 - _ = zset.zadd("leaderboard", &.{ 169 + var keys = client.keys(); 170 + _ = keys.del(&.{"leaderboard"}) catch {}; 171 + _ = zset.zaddMulti("leaderboard", .{}, &.{ 170 172 .{ .score = 100, .member = "alice" }, 171 173 .{ .score = 85, .member = "bob" }, 172 174 .{ .score = 92, .member = "charlie" }, 173 - }, .{}) catch return; 175 + }) catch return; 174 176 175 177 // ZRANGE with scores (top 3) 176 - const top = zset.zrangeWithScores("leaderboard", 0, -1, .{ .rev = true }) catch return; 178 + const top = zset.zrangeWithScores("leaderboard", 0, -1, .desc) catch return; 177 179 std.debug.print("Leaderboard (high to low):\n", .{}); 178 - for (top) |entry| { 179 - std.debug.print(" {s}: {d}\n", .{ entry.member, entry.score }); 180 + var i: usize = 0; 181 + while (i + 1 < top.len) : (i += 2) { 182 + const member = top[i].asString() orelse continue; 183 + const score = top[i + 1].asString() orelse continue; 184 + std.debug.print(" {s}: {s}\n", .{ member, score }); 180 185 } 181 186 182 187 // ZSCORE ··· 192 197 var stream = client.streams(); 193 198 194 199 // Delete and recreate stream 195 - _ = client.keys().del(&.{"events"}) catch {}; 200 + var keys = client.keys(); 201 + _ = keys.del(&.{"events"}) catch {}; 196 202 197 203 // XADD 198 204 const id1 = stream.xadd("events", .auto, &.{ ··· 218 224 for (entries) |entry| { 219 225 std.debug.print(" {s}: ", .{entry.id}); 220 226 for (entry.fields) |field| { 221 - std.debug.print("{s}={s} ", .{ field.name, field.value }); 227 + std.debug.print("{s}={s} ", .{ field.field, field.value }); 222 228 } 223 229 std.debug.print("\n", .{}); 224 230 } 225 231 226 232 // Consumer group example 227 233 _ = stream.xgroupDestroy("events", "workers") catch {}; // cleanup 228 - stream.xgroupCreate("events", "workers", "0", true) catch |err| { 234 + stream.xgroupCreate("events", "workers", "0") catch |err| { 229 235 std.debug.print("XGROUP CREATE error: {}\n", .{err}); 230 236 return; 231 237 };
+43 -18
src/client.zig
··· 27 27 //! - Use a connection pool (not provided here) 28 28 29 29 const std = @import("std"); 30 - const net = std.net; 30 + const Io = std.Io; 31 + const net = Io.net; 31 32 const Allocator = std.mem.Allocator; 32 33 33 34 const resp = @import("resp.zig"); ··· 46 47 pub const Client = struct { 47 48 const Self = @This(); 48 49 50 + io: Io, 49 51 stream: net.Stream, 50 52 allocator: Allocator, 51 53 read_buf: []u8, ··· 81 83 82 84 /// Connect to Redis with minimal configuration. 83 85 /// Use `connectWithConfig` for full control. 84 - pub fn connect(allocator: Allocator, host: []const u8, port: u16) ClientError!Self { 85 - return connectWithConfig(allocator, .{ .host = host, .port = port }); 86 + pub fn connect(io: Io, allocator: Allocator, host: []const u8, port: u16) ClientError!Self { 87 + return connectWithConfig(io, allocator, .{ .host = host, .port = port }); 86 88 } 87 89 88 90 /// Connect to Redis with full configuration. 89 - pub fn connectWithConfig(allocator: Allocator, config: Config) ClientError!Self { 91 + pub fn connectWithConfig(io: Io, allocator: Allocator, config: Config) ClientError!Self { 90 92 // Resolve address: try numeric IP first, then DNS 91 - const address = net.Address.parseIp(config.host, config.port) catch blk: { 92 - const list = net.getAddressList(allocator, config.host, config.port) catch { 93 + const tcp_stream = if (std.mem.eql(u8, config.host, "localhost")) blk: { 94 + const address = net.IpAddress{ .ip4 = net.Ip4Address.loopback(config.port) }; 95 + break :blk address.connect(io, .{ .mode = .stream }) catch { 96 + return ConnectionError.ConnectionRefused; 97 + }; 98 + } else if (net.IpAddress.parse(config.host, config.port)) |address| blk: { 99 + break :blk address.connect(io, .{ .mode = .stream }) catch { 100 + return ConnectionError.ConnectionRefused; 101 + }; 102 + } else |_| blk: { 103 + const host_name = net.HostName.init(config.host) catch { 104 + return ConnectionError.AddressResolutionFailed; 105 + }; 106 + break :blk host_name.connect(io, config.port, .{ .mode = .stream }) catch { 93 107 return ConnectionError.AddressResolutionFailed; 94 108 }; 95 - defer list.deinit(); 96 - if (list.addrs.len == 0) return ConnectionError.AddressResolutionFailed; 97 - break :blk list.addrs[0]; 98 109 }; 99 - 100 - const tcp_stream = net.tcpConnectToAddress(address) catch { 101 - return ConnectionError.ConnectionRefused; 102 - }; 103 - errdefer tcp_stream.close(); 110 + errdefer tcp_stream.close(io); 104 111 105 112 const read_buf = try allocator.alloc(u8, config.read_buffer_size); 106 113 errdefer allocator.free(read_buf); 107 114 108 115 var self = Self{ 116 + .io = io, 109 117 .stream = tcp_stream, 110 118 .allocator = allocator, 111 119 .read_buf = read_buf, ··· 134 142 self.allocator.free(buf); 135 143 } 136 144 self.allocator.free(self.read_buf); 137 - self.stream.close(); 145 + self.stream.close(self.io); 138 146 } 139 147 140 148 // ======================================================================== ··· 183 191 } 184 192 185 193 // Send command 186 - _ = try self.stream.write(cmd_buf[0..pos]); 194 + var write_buf: [4096]u8 = undefined; 195 + var writer = self.stream.writer(self.io, &write_buf); 196 + writer.interface.writeAll(cmd_buf[0..pos]) catch |err| switch (err) { 197 + error.WriteFailed => return writer.err orelse ProtocolError.ConnectionClosed, 198 + }; 199 + writer.interface.flush() catch |err| switch (err) { 200 + error.WriteFailed => return writer.err orelse ProtocolError.ConnectionClosed, 201 + }; 187 202 188 203 // Reset buffer state for new response 189 204 self.buf_len = 0; ··· 230 245 fn readResponse(self: *Self) ClientError!Value { 231 246 // Ensure we have at least one byte 232 247 if (self.buf_len == 0) { 233 - const n = try self.stream.read(self.read_buf); 248 + const n = try self.readFromStream(self.read_buf); 234 249 if (n == 0) return ProtocolError.ConnectionClosed; 235 250 self.buf_len = n; 236 251 } ··· 316 331 self.read_buf = new_buf; 317 332 } 318 333 319 - const n = try self.stream.read(self.read_buf[self.buf_len..]); 334 + const n = try self.readFromStream(self.read_buf[self.buf_len..]); 320 335 if (n == 0) return ProtocolError.ConnectionClosed; 321 336 self.buf_len += n; 337 + } 338 + 339 + fn readFromStream(self: *Self, dest: []u8) ClientError!usize { 340 + var empty: [0]u8 = .{}; 341 + var reader = self.stream.reader(self.io, &empty); 342 + var bufs = [_][]u8{dest}; 343 + return reader.interface.readVec(&bufs) catch |err| switch (err) { 344 + error.EndOfStream => return 0, 345 + error.ReadFailed => return reader.err orelse ProtocolError.ConnectionClosed, 346 + }; 322 347 } 323 348 324 349 // ========================================================================
+1 -1
src/resp.zig
··· 76 76 77 77 /// Combined error set for all client operations. 78 78 /// Use this for functions that might fail at any level. 79 - pub const ClientError = ConnectionError || ProtocolError || CommandError || std.mem.Allocator.Error || std.posix.ReadError || std.net.Stream.WriteError; 79 + pub const ClientError = ConnectionError || ProtocolError || CommandError || std.mem.Allocator.Error || std.Io.net.Stream.Reader.Error || std.Io.net.Stream.Writer.Error; 80 80 81 81 // ============================================================================ 82 82 // RESP Value Type