nd,ngui: remove memleaks and deadlocks mostly from v0.4.0
ci/woodpecker/push/woodpecker Pipeline was successful Details
ci/woodpecker/tag/woodpecker Pipeline was successful Details

the offending memleak printed out at program exit was introduced in
116fb3b5, specifically line 140 in src/bitcoindrpc.zig using incorrect
allocator.

while there, improved a bit overall memory handling by making sure
heap-alloc'ed messages are always deinit'ed.

another source of trouble was logging from within a sighandler: the
logger uses a thread mutex which may deadlock when the handler is
invoked by the OS.
pull/31/head v0.5.1
alex 11 months ago
parent 280bea40df
commit 29f6975601
Signed by: x1ddos
GPG Key ID: FDEFB4A63CBD8460

@ -137,7 +137,7 @@ pub const Client = struct {
fn parseResponse(self: Client, comptime m: Method, b: []const u8) !Result(m) {
var resp = try types.Deinitable(RpcResponse(m)).init(self.allocator);
errdefer resp.deinit();
resp.value = try std.json.parseFromSliceLeaky(RpcResponse(m), self.allocator, b, .{
resp.value = try std.json.parseFromSliceLeaky(RpcResponse(m), resp.arena.allocator(), b, .{
.ignore_unknown_fields = true,
.allocate = .alloc_always,
});
@ -150,6 +150,7 @@ pub const Client = struct {
return .{ .value = resp.value.result.?, .arena = resp.arena };
}
/// callers own returned value.
fn formatreq(self: *Client, comptime m: Method, args: MethodArgs(m)) ![]const u8 {
const req = RpcRequest(m){
.id = self.reqid.fetchAdd(1, .Monotonic),
@ -163,7 +164,7 @@ pub const Client = struct {
const auth = try self.getAuthBase64();
defer self.allocator.free(auth);
var bytes = std.ArrayList(u8).init(self.allocator);
var bytes = std.ArrayList(u8).init(self.allocator); // return value as owned slice
const w = bytes.writer();
try w.writeAll("POST / HTTP/1.0\r\n");
//try w.writeAll("Host: 127.0.0.1\n", .{});

@ -234,13 +234,13 @@ pub fn read(allocator: mem.Allocator, reader: anytype) !ParsedMessage {
else => Error.CommReadZeroLenInNonVoidTag,
};
}
var bytes = try allocator.alloc(u8, len);
defer allocator.free(bytes);
try reader.readNoEof(bytes);
return switch (tag) {
switch (tag) {
.ping, .pong, .poweroff, .standby, .wakeup => unreachable, // handled above
inline else => |t| {
var bytes = try allocator.alloc(u8, len);
defer allocator.free(bytes);
try reader.readNoEof(bytes);
var arena = try allocator.create(std.heap.ArenaAllocator);
arena.* = std.heap.ArenaAllocator.init(allocator);
errdefer {
@ -255,7 +255,7 @@ pub fn read(allocator: mem.Allocator, reader: anytype) !ParsedMessage {
};
return parsed;
},
};
}
}
/// outputs the message msg using writer.

@ -166,7 +166,7 @@ pub const Client = struct {
.listchannels => .{
.httpmethod = .GET,
.url = blk: {
var buf = std.ArrayList(u8).init(arena);
var buf = std.ArrayList(u8).init(arena); // free'ed when arena is deinit'ed, formatreq call site
const w = buf.writer();
try std.fmt.format(w, "{s}/v1/channels?peer_alias_lookup={}", .{ self.apibase, args.peer_alias_lookup });
if (args.status) |v| switch (v) {
@ -181,7 +181,7 @@ pub const Client = struct {
// TODO: sanitize; Uri.writeEscapedQuery(w, q);
try std.fmt.format(w, "&peer={s}", .{v});
}
break :blk try std.Uri.parse(buf.items);
break :blk try std.Uri.parse(buf.items); // uri point to the original buf
},
.headers = blk: {
var h = std.http.Headers{ .allocator = arena };

@ -132,7 +132,9 @@ fn parseArgs(gpa: std.mem.Allocator) !NdArgs {
var sigquit: std.Thread.ResetEvent = .{};
fn sighandler(sig: c_int) callconv(.C) void {
logger.info("received signal {}", .{sig});
if (sigquit.isSet()) {
return;
}
switch (sig) {
os.SIG.INT, os.SIG.TERM => sigquit.set(),
else => {},
@ -215,6 +217,7 @@ pub fn main() !void {
try os.sigaction(os.SIG.INT, &sa, null);
try os.sigaction(os.SIG.TERM, &sa, null);
sigquit.wait();
logger.info("sigquit: terminating ...", .{});
// reached here due to sig TERM or INT.
// tell deamon to terminate threads.

@ -376,9 +376,8 @@ fn commThreadLoop(self: *Daemon) void {
};
defer res.deinit();
const msg = res.value;
logger.debug("got msg: {s}", .{@tagName(msg)});
switch (msg) {
logger.debug("got msg: {s}", .{@tagName(res.value)});
switch (res.value) {
.pong => {
logger.info("received pong from ngui", .{});
},
@ -408,7 +407,7 @@ fn commThreadLoop(self: *Daemon) void {
// TODO: send err back to ngui
};
},
else => logger.warn("unhandled msg tag {s}", .{@tagName(msg)}),
else => |v| logger.warn("unhandled msg tag {s}", .{@tagName(v)}),
}
self.mu.lock();
@ -579,7 +578,7 @@ fn sendBitcoindReport(self: *Daemon) !void {
const mempool = try client.call(.getmempoolinfo, {});
defer mempool.deinit();
const balance: ?lndhttp.WalletBalance = blk: {
const balance: ?lndhttp.Client.Result(.walletbalance) = blk: { // lndhttp.WalletBalance
var lndc = lndhttp.Client.init(.{
.allocator = self.allocator,
.tlscert_path = "/home/lnd/.lnd/tls.cert",
@ -587,9 +586,9 @@ fn sendBitcoindReport(self: *Daemon) !void {
}) catch break :blk null;
defer lndc.deinit();
const res = lndc.call(.walletbalance, {}) catch break :blk null;
defer res.deinit();
break :blk res.value;
break :blk res;
};
defer if (balance) |bal| bal.deinit();
const btcrep: comm.Message.BitcoinReport = .{
.blocks = bcinfo.value.blocks,
@ -617,11 +616,11 @@ fn sendBitcoindReport(self: *Daemon) !void {
},
.balance = if (balance) |bal| .{
.source = .lnd,
.total = bal.total_balance,
.confirmed = bal.confirmed_balance,
.unconfirmed = bal.unconfirmed_balance,
.locked = bal.locked_balance,
.reserved = bal.reserved_balance_anchor_chan,
.total = bal.value.total_balance,
.confirmed = bal.value.confirmed_balance,
.unconfirmed = bal.value.unconfirmed_balance,
.locked = bal.value.locked_balance,
.reserved = bal.value.reserved_balance_anchor_chan,
} else null,
};

@ -226,15 +226,24 @@ fn commThreadLoopCycle() !void {
defer ui_mutex.unlock();
switch (state) {
.standby => switch (msg.value) {
.ping => try comm.pipeWrite(comm.Message.pong),
.ping => {
defer msg.deinit();
try comm.pipeWrite(comm.Message.pong);
},
.network_report,
.bitcoind_report,
.lightning_report,
=> last_report.replace(msg),
else => logger.debug("ignoring {s}: in standby", .{@tagName(msg.value)}),
else => {
logger.debug("ignoring {s}: in standby", .{@tagName(msg.value)});
msg.deinit();
},
},
.active, .alert => switch (msg.value) {
.ping => try comm.pipeWrite(comm.Message.pong),
.ping => {
defer msg.deinit();
try comm.pipeWrite(comm.Message.pong);
},
.poweroff_progress => |rep| {
ui.poweroff.updateStatus(rep) catch |err| logger.err("poweroff.updateStatus: {any}", .{err});
msg.deinit();
@ -253,8 +262,12 @@ fn commThreadLoopCycle() !void {
},
.settings => |sett| {
ui.settings.update(sett) catch |err| logger.err("settings.update: {any}", .{err});
msg.deinit();
},
else => {
logger.warn("unhandled msg tag {s}", .{@tagName(msg.value)});
msg.deinit();
},
else => logger.warn("unhandled msg tag {s}", .{@tagName(msg.value)}),
},
}
}
@ -342,7 +355,9 @@ fn usage(prog: []const u8) !void {
/// handles sig TERM and INT: makes the program exit.
fn sighandler(sig: c_int) callconv(.C) void {
logger.info("received signal {}", .{sig});
if (sigquit.isSet()) {
return;
}
switch (sig) {
os.SIG.INT, os.SIG.TERM => sigquit.set(),
else => {},
@ -399,9 +414,13 @@ pub fn main() anyerror!void {
try os.sigaction(os.SIG.INT, &sa, null);
try os.sigaction(os.SIG.TERM, &sa, null);
sigquit.wait();
logger.info("sigquit: terminating ...", .{});
// assuming nd won't ever send any more messages,
// so should be safe to touch last_report without a fence.
last_report.deinit();
logger.info("main terminated", .{});
// let the OS rip off UI and comm threads
}
test "tick" {