diff --git a/src/main.zig b/src/main.zig index b283d2e..e95010a 100644 --- a/src/main.zig +++ b/src/main.zig @@ -23,55 +23,154 @@ pub const RotInt = struct { allocator: std.mem.Allocator, // TODO: associate timestamps with this somehow offsets: AzEl = .{ .az = 0, .el = 0 }, - last_request: AzEl = .{ .az = 0, .el = 0 }, + requested_posture: AzEl = .{ .az = 0, .el = 0 }, current_posture: AzEl = .{ .az = 0, .el = 0 }, + flipflop: bool = false, + state: State = .initial, - // printbuffer: [128]u8 = undefined, termbuffer: std.io.BufferedWriter(4096, std.io.AnyWriter), vx: *vaxis.Vaxis, loop: *xev.Loop, - fake: FakeRotator = .{}, parser: rotctl.RotCtl = .{}, + last_command: rotctl.RotCommand = undefined, server: networking.Server = .{}, + rotator: networking.Client = .{}, + poller: xev.Timer, + poll_completion: xev.Completion = undefined, - pub fn initFixed(self: *RotInt) void { + const poll_interval: u64 = 1000; + + pub const State = enum { + initial, + rotator_connected, + rotator_ready, + server_connected, + }; + + pub fn initInPlace(self: *RotInt) !void { self.server.rotint = self; + self.rotator.rotint = self; + + const connect_addr = try std.net.Address.parseIp("127.0.0.1", 4533); + try self.rotator.connect(self.loop, connect_addr); + } + + pub fn stateEvent(self: *RotInt, event: State) void { + switch (event) { + .initial => {}, + .rotator_connected => if (self.state == .initial) { + self.warn("rotator connected", .{}); + self.sendRotatorCommand(.get_position); + self.state = .rotator_connected; + }, + .rotator_ready => if (self.state == .rotator_connected) { + self.warn("rotator ready", .{}); + const listen_addr = std.net.Address.parseIp("127.0.0.1", 42069) catch { + self.warn("bogus listen address", .{}); + return; + }; + self.server.listen(self.loop, listen_addr) catch { + self.warn("listen problem", .{}); + return; + }; + // demangle here to avoid causing initial moves + self.requested_posture = .{ + .az = self.current_posture.az - self.offsets.az, + .el = self.current_posture.el - self.offsets.el, + }; + self.state = .rotator_ready; + }, + .server_connected => if (self.state == .rotator_ready) { + self.warn("server listening", .{}); + self.state = .server_connected; + }, + } + } + + fn poll( + self_: ?*RotInt, + _: *xev.Loop, + _: *xev.Completion, + result: xev.Timer.RunError!void, + ) xev.CallbackAction { + const self = self_.?; + result catch { + self.warn("timer error???", .{}); + return .disarm; + }; + + if (self.flipflop) { + const mangled: AzEl = .{ + .az = self.requested_posture.az + self.offsets.az, + .el = self.requested_posture.el + self.offsets.el, + }; + self.sendRotatorCommand(.{ .set_position = mangled }); + } else { + self.sendRotatorCommand(.get_position); + } + self.flipflop = !self.flipflop; + + return .disarm; + } + + fn sendRotatorCommand(self: *RotInt, command: rotctl.RotCommand) void { + self.last_command = command; + self.rotator.sendCommand(self.loop, command); + } + + pub fn handleRotatorReply(self: *RotInt, res: []const u8) void { + const reply = self.parser.parseReplyFor(self.last_command, res) catch |err| switch (err) { + error.Incomplete => return, + error.InvalidParameter => { + return; + }, + }; + + switch (reply) { + .okay => {}, + .get_position => |pos| { + self.current_posture = pos; + if (self.state == .rotator_connected) self.stateEvent(.rotator_ready); + }, + .status => |code| if (code != .okay) + self.warn("rotctl error {s}", .{@tagName(code)}), + } + self.poller.run(self.loop, &self.poll_completion, poll_interval, RotInt, self, poll); } pub fn warn(_: *RotInt, comptime fmt: []const u8, args: anytype) void { log.warn(fmt, args); } - pub fn forwardRequest(self: *RotInt, req: []const u8) void { - var temp: [128]u8 = undefined; + pub fn handleControlRequest(self: *RotInt, req: []const u8) void { const command = self.parser.parseCommand(req) catch |err| switch (err) { error.Incomplete => return, error.NotSupported => { - const response = (rotctl.RotReply{ .status = .not_supported }).write(temp[0..]) catch { - self.warn("serialization failure", .{}); - return; - }; - self.server.writeResponse(self.loop, response); + self.server.respond(self.loop, .{ .status = .not_supported }); return; }, error.InvalidParameter => { - const response = (rotctl.RotReply{ .status = .invalid_parameter }).write(temp[0..]) catch { - self.warn("serialization failure", .{}); - return; - }; - self.server.writeResponse(self.loop, response); + self.server.respond(self.loop, .{ .status = .invalid_parameter }); return; }, }; - // if command is `quit`, we should disconnect the client after our reply has been sent. - const reply = self.fake.request(command); - const response = reply.write(temp[0..]) catch { - self.warn("serialization failure", .{}); - return; - }; - self.server.writeResponse(self.loop, response); + switch (command) { + .get_position => self.server.respond(self.loop, .{ .get_position = self.current_posture }), + + .set_position => |pos| { + self.requested_posture = pos; + self.server.respond(self.loop, .okay); + }, + + .stop => self.server.respond(self.loop, .okay), + .park => self.server.respond(self.loop, .okay), + .quit => { + self.server.respond(self.loop, .okay); + self.server.should_disconnect = true; + }, + } } fn draw(self: *RotInt) !void { @@ -118,42 +217,6 @@ pub const RotInt = struct { } }; -const FakeRotator = struct { - const park_position: AzEl = .{ .az = 180, .el = 90 }; - - current: AzEl = park_position, - - fn request(self: *FakeRotator, command: rotctl.RotCommand) rotctl.RotReply { - return switch (command) { - .get_position => .{ .get_position = self.current }, - .set_position => |pos| blk: { - self.current = pos; - break :blk .{ .status = .okay }; - }, - .stop => .{ .status = .okay }, - .park => blk: { - self.current = park_position; - break :blk .{ .status = .okay }; - }, - .quit => .{ .status = .okay }, - }; - } -}; - -// const App = struct { -// const lower_limit: u8 = 30; -// const next_ms: u64 = 8; - -// allocator: std.mem.Allocator, -// vx: *vaxis.Vaxis, -// buffered_writer: std.io.BufferedWriter(4096, std.io.AnyWriter), -// color_idx: u8, -// dir: enum { -// up, -// down, -// }, -// }; - pub fn main() !void { var gpa = std.heap.GeneralPurposeAllocator(.{}){}; defer _ = gpa.deinit(); @@ -177,31 +240,30 @@ pub fn main() !void { .termbuffer = tty.bufferedWriter(), .vx = &vx, .loop = &loop, + .poller = try xev.Timer.init(), }; - app.initFixed(); + try app.initInPlace(); var vx_loop: vaxis.xev.TtyWatcher(RotInt) = undefined; try vx_loop.init(&tty, &vx, &loop, &app, eventCallback); - // try vx.enterAltScreen(tty.anyWriter()); + try vx.enterAltScreen(tty.anyWriter()); try vx.queryTerminalSend(tty.anyWriter()); // Window size appears to be left uninitialized unless we manually set it here. This // seems sketchy to me (tty fd should be nonblocking for the event loop) const size = try vaxis.Tty.getWinsize(tty.fd); vx.resize(alloc, tty.anyWriter(), size) catch @panic("TODO"); - try app.server.listen(app.loop); - try loop.run(.until_done); } fn eventCallback( - ud: ?*RotInt, + self_: ?*RotInt, loop: *xev.Loop, watcher: *vaxis.xev.TtyWatcher(RotInt), event: vaxis.xev.Event, ) xev.CallbackAction { - const app = ud orelse unreachable; + const self = self_.?; switch (event) { .key_press => |key| keyp: { var mods = key.mods; @@ -216,7 +278,7 @@ fn eventCallback( vaxis.Key.down, vaxis.Key.kp_down => .{ .az = 0 * scale, .el = -0.1 * scale }, 'l' => { if (std.meta.eql(mods, .{ .ctrl = true })) - app.vx.queueRefresh(); + self.vx.queueRefresh(); break :keyp; }, 'c' => { @@ -236,13 +298,11 @@ fn eventCallback( else => break :keyp, }; - _ = delta; - - // state.offsets.az += delta.az; - // state.offsets.el += delta.el; + self.offsets.az += delta.az; + self.offsets.el += delta.el; }, .winsize => |ws| { - watcher.vx.resize(app.allocator, watcher.tty.anyWriter(), ws) catch + watcher.vx.resize(self.allocator, watcher.tty.anyWriter(), ws) catch return .disarm; }, else => {}, diff --git a/src/networking.zig b/src/networking.zig index fa4c40d..b542021 100644 --- a/src/networking.zig +++ b/src/networking.zig @@ -1,9 +1,12 @@ const std = @import("std"); + const xev = @import("xev"); const RotInt = @import("./main.zig").RotInt; +const rotctl = @import("./rotctl.zig"); pub const Server = struct { + // rotint initializes itself rotint: *RotInt = undefined, listen_socket: xev.TCP = undefined, client_socket: ?xev.TCP = null, @@ -13,15 +16,31 @@ pub const Server = struct { // size than this. read_buffer: [128]u8 = undefined, write_buffer: [128]u8 = undefined, + should_disconnect: bool = false, - pub fn listen(self: *Server, loop: *xev.Loop) !void { - const address = try std.net.Address.parseIp4("127.0.0.1", 5435); + pub fn listen(self: *Server, loop: *xev.Loop, address: std.net.Address) !void { self.listen_socket = try xev.TCP.init(address); try self.listen_socket.bind(address); try self.listen_socket.listen(1); self.listen_socket.accept(loop, &self.read_completion, Server, self, acceptCallback); } + pub fn respond(self: *Server, loop: *xev.Loop, response: rotctl.RotReply) void { + const rep = response.write(self.write_buffer[0..]) catch { + self.rotint.warn("could not write response {s}", .{@tagName(response)}); + return; + }; + if (self.client_socket) |socket| + socket.write( + loop, + &self.write_completion, + .{ .slice = rep }, + Server, + self, + writeCallback, + ); + } + fn acceptCallback( self_: ?*Server, loop: *xev.Loop, @@ -35,15 +54,16 @@ pub const Server = struct { self.client_socket = result catch { // different error sets for different backends makes handling individual // errors here odd. Either way, if this fails, I think we just want to - // retry anyway. + // retry anyway (maybe this should be a fatal error instead?). return .rearm; }; + self.should_disconnect = false; // this is already using read_completion, but pass it explicitly here to be clear self.client_socket.?.read( loop, &self.read_completion, - .{ .slice = &self.read_buffer }, + .{ .slice = self.read_buffer[0..] }, Server, self, readCallback, @@ -59,48 +79,31 @@ pub const Server = struct { buf: xev.ReadBuffer, result: xev.TCP.ReadError!usize, ) xev.CallbackAction { + _ = socket; const self = self_.?; const n = result catch |err| switch (err) { // we can reuse the completion here in either of these error cases since we // disarm in both of them error.EOF => { - socket.shutdown(loop, completion, Server, self, shutdownCallback); - self.client_socket = null; + self.disconnectClient(loop, completion); return .disarm; }, else => { self.rotint.warn("server read unexpected err={}", .{err}); // TODO: if read failed unexpectedly, will shutting down the socket like // this work? - socket.shutdown(loop, completion, Server, self, shutdownCallback); - self.client_socket = null; + self.disconnectClient(loop, completion); return .disarm; }, }; // this is a request from gpredict to rotctl. We get rotint to take care of it // for us (it stores a copy of this buffer) - self.rotint.forwardRequest(buf.slice[0..n]); + self.rotint.handleControlRequest(buf.slice[0..n]); + // we could disarm until we have generated and sent our response.... return .rearm; } - pub fn writeResponse( - self: *Server, - loop: *xev.Loop, - response: []const u8, - ) void { - @memcpy(self.write_buffer[0..response.len], response); - if (self.client_socket) |sock| - sock.write( - loop, - &self.write_completion, - .{ .slice = self.write_buffer[0..response.len] }, - Server, - self, - writeCallback, - ); - } - fn writeCallback( self_: ?*Server, loop: *xev.Loop, @@ -109,22 +112,32 @@ pub const Server = struct { buf: xev.WriteBuffer, result: xev.TCP.WriteError!usize, ) xev.CallbackAction { + _ = socket; _ = buf; const self = self_.?; _ = result catch |err| { // we're in trouble. Probably this is due to an unexpected client disconnect? self.rotint.warn("server write unexpected err={}", .{err}); - socket.shutdown(loop, completion, Server, self, shutdownCallback); - self.client_socket = null; + self.disconnectClient(loop, completion); return .disarm; }; - // inform rotint of successful write? + if (self.should_disconnect) { + self.disconnectClient(loop, completion); + return .disarm; + } return .disarm; } + fn disconnectClient(self: *Server, loop: *xev.Loop, completion: *xev.Completion) void { + if (self.client_socket) |sock| { + sock.shutdown(loop, completion, Server, self, shutdownCallback); + self.client_socket = null; + } + } + fn shutdownCallback( self_: ?*Server, loop: *xev.Loop, @@ -135,6 +148,7 @@ pub const Server = struct { _ = result catch {}; // shutdown only closes the write side of the socket. It's not entirely clear to // me if shutdown actually needs to be called before close. + // This can be called with either the read or write completion. socket.close(loop, completion, Server, self_.?, closeCallback); return .disarm; } @@ -157,3 +171,133 @@ pub const Server = struct { return .disarm; } }; + +pub const Client = struct { + // rotint initializes itself. could use @fieldParentPtr("rotator", self) also + rotint: *RotInt = undefined, + socket: xev.TCP = undefined, + read_completion: xev.Completion = undefined, + write_completion: xev.Completion = undefined, + read_buffer: [128]u8 = undefined, + write_buffer: [128]u8 = undefined, + + pub fn connect(self: *Client, loop: *xev.Loop, address: std.net.Address) !void { + self.socket = try xev.TCP.init(address); + self.socket.connect(loop, &self.read_completion, address, Client, self, connectCallback); + } + + pub fn sendCommand(self: *Client, loop: *xev.Loop, command: rotctl.RotCommand) void { + const req = command.write(self.write_buffer[0..]) catch { + self.rotint.warn("could not write command {s}", .{@tagName(command)}); + return; + }; + self.socket.write( + loop, + &self.write_completion, + .{ .slice = req }, + Client, + self, + writeCallback, + ); + } + + fn disconnect(self: *Client, loop: *xev.Loop, completion: *xev.Completion) void { + self.socket.shutdown(loop, completion, Client, self, shutdownCallback); + } + + fn connectCallback( + self_: ?*Client, + loop: *xev.Loop, + completion: *xev.Completion, + socket: xev.TCP, + result: xev.TCP.ConnectError!void, + ) xev.CallbackAction { + _ = completion; + + const self = self_.?; + + _ = result catch |err| { + self.rotint.warn("connect failed {}", .{err}); + // retry bayeb + return .rearm; + }; + + self.rotint.stateEvent(.rotator_connected); + socket.read( + loop, + &self.read_completion, + .{ .slice = self.read_buffer[0..] }, + Client, + self, + readCallback, + ); + return .disarm; + } + + fn writeCallback( + self_: ?*Client, + loop: *xev.Loop, + completion: *xev.Completion, + socket: xev.TCP, + buf: xev.WriteBuffer, + result: xev.TCP.WriteError!usize, + ) xev.CallbackAction { + const self = self_.?; + _ = result catch { + self.disconnect(loop, completion); + return .disarm; + }; + _ = socket; + _ = buf; + + return .disarm; + } + + fn readCallback( + self_: ?*Client, + loop: *xev.Loop, + completion: *xev.Completion, + socket: xev.TCP, + buf: xev.ReadBuffer, + result: xev.TCP.ReadError!usize, + ) xev.CallbackAction { + _ = socket; + + const self = self_.?; + const n = result catch { + self.disconnect(loop, completion); + return .disarm; + }; + + self.rotint.handleRotatorReply(buf.slice[0..n]); + return .rearm; + } + + fn shutdownCallback( + self_: ?*Client, + loop: *xev.Loop, + completion: *xev.Completion, + socket: xev.TCP, + result: xev.TCP.ShutdownError!void, + ) xev.CallbackAction { + const self = self_.?; + _ = result catch {}; + socket.close(loop, completion, Client, self, closeCallback); + return .disarm; + } + + fn closeCallback( + self_: ?*Client, + loop: *xev.Loop, + completion: *xev.Completion, + socket: xev.TCP, + result: xev.TCP.CloseError!void, + ) xev.CallbackAction { + _ = self_; + _ = loop; + _ = completion; + _ = socket; + _ = result catch {}; + return .disarm; + } +}; diff --git a/src/rotctl.zig b/src/rotctl.zig index 2d74bed..38b7db9 100644 --- a/src/rotctl.zig +++ b/src/rotctl.zig @@ -18,38 +18,33 @@ pub const RotCtl = struct { const ParseError = error{Incomplete}; pub fn parseCommand(self: *RotCtl, incoming: []const u8) (ParseError || RotCommand.ParseError)!RotCommand { - log.info("wbuf: '{s}'", .{self.writebuf[0..self.wlen]}); @memcpy(self.writebuf[self.wlen .. self.wlen + incoming.len], incoming); self.wlen += incoming.len; - log.info("wbuf: '{s}'", .{self.writebuf[0..self.wlen]}); const end = std.mem.indexOfScalarPos(u8, self.writebuf[0..], 0, '\n') orelse return error.Incomplete; defer { self.wlen = shiftBuf(&self.writebuf[0..self.wlen], end + 1); - log.info("wbuf: '{s}'", .{self.writebuf[0..self.wlen]}); } return try RotCommand.parse(self.writebuf[0..end]); } pub fn parseReplyFor(self: *RotCtl, command: RotCommand, incoming: []const u8) (ParseError || RotReply.ParseError)!RotReply { - log.info("rbuf: '{s}'", .{self.readbuf[0..self.rlen]}); @memcpy(self.readbuf[self.rlen .. self.rlen + incoming.len], incoming); self.rlen += incoming.len; - log.info("rbuf: '{s}'", .{self.readbuf[0..self.rlen]}); var lines: [3][]const u8 = undefined; var lslice: [][]const u8 = lines[0..0]; var offset: usize = 0; - while (std.mem.indexOfScalarPos(u8, self.readbuf[0..], offset, '\n')) |line| { - offset += line.len + 1; + while (std.mem.indexOfScalarPos(u8, self.readbuf[0..], offset, '\n')) |idx| { + const line = self.readbuf[offset..idx]; + offset += idx + 1; lslice.len += 1; lslice[lslice.len - 1] = line; if (lslice.len == command.expectedResponseLines() or (std.mem.startsWith(u8, line, "RPRT ") and line.len > 5)) { defer { self.rlen = shiftBuf(&self.readbuf[0..self.rlen], offset); - log.info("rbuf: '{s}'", .{self.readbuf[0..self.rlen]}); } return RotReply.parse(command, lslice); } @@ -173,6 +168,7 @@ pub const RotCommand = union(enum) { }; pub const RotReply = union(enum) { + okay, get_position: AzEl, status: Status, @@ -183,7 +179,10 @@ pub const RotReply = union(enum) { pub fn parse(command: RotCommand, lines: []const []const u8) ParseError!RotReply { return switch (command) { .set_position, .park, .stop, .quit => switch (lines.len) { - 1 => .{ .status = try Status.parse(lines[0]) }, + 1 => blk: { + const code = try Status.parse(lines[0]); + break :blk if (code == .okay) .okay else .{ .status = code }; + }, else => error.InvalidParameter, }, @@ -201,6 +200,7 @@ pub const RotReply = union(enum) { pub fn write(self: RotReply, buf: []u8) ![]const u8 { return switch (self) { .get_position => |pos| try std.fmt.bufPrint(buf, "{d:.1}\n{d:.1}\n", .{ pos.az, pos.el }), + .okay => try Status.okay.write(buf), .status => |code| try code.write(buf), }; }