nd: add bitcoin core report sent to UI every min

this adds a very simple bitcoind RPC client able to make a few queries
like the blockchain and network status. the daemon uses the client
to get the bitcoind status and sends a report through comms periodically.

there's also a little playground program which simply dumps a query
result to stderr. built on demand with "zig build btcrpc".
alex 1 year ago
parent 7d1ab5cb78
commit cb2e98cd8b
Signed by: x1ddos
GPG Key ID: FDEFB4A63CBD8460

@ -135,6 +135,16 @@ pub fn build(b: *std.build.Builder) void {
guiplay_build_step.dependOn(&b.addInstallArtifact(guiplay).step);
guiplay_build_step.dependOn(ngui_build_step);
}
{
const btcrpc = b.addExecutable("btcrpc", "src/test/btcrpc.zig");
btcrpc.setTarget(target);
btcrpc.setBuildMode(mode);
btcrpc.strip = strip;
btcrpc.addPackagePath("bitcoindrpc", "src/nd/bitcoindrpc.zig");
const btcrpc_build_step = b.step("btcrpc", "bitcoind RPC client playground");
btcrpc_build_step.dependOn(&b.addInstallArtifact(btcrpc).step);
}
}
const DriverTarget = enum {

@ -27,6 +27,7 @@ pub const Message = union(MessageTag) {
network_report: NetworkReport,
get_network_report: GetNetworkReport,
poweroff_progress: PoweroffProgress,
bitcoind_report: BitcoindReport,
pub const WifiConnect = struct {
ssid: []const u8,
@ -52,6 +53,34 @@ pub const Message = union(MessageTag) {
err: ?[]const u8,
};
};
pub const BitcoindReport = struct {
blocks: u64,
headers: u64,
timestamp: u64, // unix epoch
hash: []const u8, // best block hash
ibd: bool, // initial block download
verifyprogress: u8, // 0-100%
diskusage: u64, // estimated size on disk, in bytes
version: []const u8, // bitcoin core version string
conn_in: u16,
conn_out: u16,
warnings: []const u8,
localaddr: []struct {
addr: []const u8,
port: u16,
score: i16,
},
mempool: struct {
loaded: bool,
txcount: usize,
usage: u64, // in memory, bytes
max: u64, // bytes
totalfee: f32, // in BTC
minfee: f32, // BTC/kvB
fullrbf: bool,
},
};
};
/// it is important to preserve ordinal values for future compatiblity,
@ -69,7 +98,9 @@ pub const MessageTag = enum(u16) {
wakeup = 0x08,
// nd -> ngui: reports poweroff progress
poweroff_progress = 0x09,
// next: 0x0a
// nd -> ngui: bitcoin core daemon status report
bitcoind_report = 0x0a,
// next: 0x0b
};
/// reads and parses a single message from the input stream reader.
@ -111,6 +142,9 @@ pub fn read(allocator: mem.Allocator, reader: anytype) !Message {
.poweroff_progress => Message{
.poweroff_progress = try json.parse(Message.PoweroffProgress, &jstream, jopt),
},
.bitcoind_report => Message{
.bitcoind_report = try json.parse(Message.BitcoindReport, &jstream, jopt),
},
};
}
@ -126,6 +160,7 @@ pub fn write(allocator: mem.Allocator, writer: anytype, msg: Message) !void {
.network_report => try json.stringify(msg.network_report, jopt, data.writer()),
.get_network_report => try json.stringify(msg.get_network_report, jopt, data.writer()),
.poweroff_progress => try json.stringify(msg.poweroff_progress, jopt, data.writer()),
.bitcoind_report => try json.stringify(msg.bitcoind_report, jopt, data.writer()),
}
if (data.items.len > std.math.maxInt(u64)) {
return Error.CommWriteTooLarge;

@ -20,6 +20,7 @@ const network = @import("network.zig");
const screen = @import("../ui/screen.zig");
const types = @import("../types.zig");
const SysService = @import("SysService.zig");
const bitcoindrpc = @import("bitcoindrpc.zig");
const logger = std.log.scoped(.daemon);
@ -50,6 +51,10 @@ want_wifi_scan: bool, // initiate wifi scan at the next loop cycle
network_report_ready: bool, // indicates whether the network status is ready to be sent
wifi_scan_in_progress: bool = false,
wpa_save_config_on_connected: bool = false,
// bitcoin flags
want_bitcoind_report: bool,
bitcoin_timer: time.Timer,
bitcoin_report_interval: u64 = 1 * time.ns_per_min,
/// system services actively managed by the daemon.
/// these are stop'ed during poweroff and their shutdown progress sent to ngui.
@ -82,6 +87,9 @@ pub fn init(a: std.mem.Allocator, r: std.fs.File.Reader, w: std.fs.File.Writer,
.want_network_report = true,
.want_wifi_scan = false,
.network_report_ready = true,
// report bitcoind status immediately on start
.want_bitcoind_report = true,
.bitcoin_timer = try time.Timer.start(),
};
}
@ -269,6 +277,14 @@ fn mainThreadLoopCycle(self: *Daemon) !void {
logger.err("network.sendReport: {any}", .{err});
}
}
if (self.want_bitcoind_report or self.bitcoin_timer.read() > self.bitcoin_report_interval) {
if (self.sendBitcoindReport()) {
self.bitcoin_timer.reset();
self.want_bitcoind_report = false;
} else |err| {
logger.err("sendBitcoinReport: {any}", .{err});
}
}
}
/// comm thread entry point: reads messages sent from ngui and acts accordinly.
@ -484,12 +500,54 @@ fn readWPACtrlMsg(self: *Daemon) !void {
}
}
fn sendBitcoindReport(self: *Daemon) !void {
var client = bitcoindrpc.Client{
.allocator = self.allocator,
.cookiepath = "/ssd/bitcoind/mainnet/.cookie",
};
const bcinfo = try client.call(.getblockchaininfo, {});
defer bcinfo.deinit();
const netinfo = try client.call(.getnetworkinfo, {});
defer netinfo.deinit();
const mempool = try client.call(.getmempoolinfo, {});
defer mempool.deinit();
const btcrep: comm.Message.BitcoindReport = .{
.blocks = bcinfo.value.blocks,
.headers = bcinfo.value.headers,
.timestamp = bcinfo.value.time,
.hash = bcinfo.value.bestblockhash,
.ibd = bcinfo.value.initialblockdownload,
.diskusage = bcinfo.value.size_on_disk,
.version = netinfo.value.subversion,
.conn_in = netinfo.value.connections_in,
.conn_out = netinfo.value.connections_out,
.warnings = bcinfo.value.warnings, // TODO: netinfo.result.warnings
.localaddr = &.{}, // TODO: populate
// something similar to this:
// @round(bcinfo.verificationprogress * 100)
.verifyprogress = 0,
.mempool = .{
.loaded = mempool.value.loaded,
.txcount = mempool.value.size,
.usage = mempool.value.usage,
.max = mempool.value.maxmempool,
.totalfee = mempool.value.total_fee,
.minfee = mempool.value.mempoolminfee,
.fullrbf = mempool.value.fullrbf,
},
};
try comm.write(self.allocator, self.uiwriter, .{ .bitcoind_report = btcrep });
}
test "start-stop" {
const t = std.testing;
const pipe = try types.IoPipe.create();
var daemon = try Daemon.init(t.allocator, pipe.reader(), pipe.writer(), "/dev/null");
daemon.want_network_report = false;
daemon.want_bitcoind_report = false;
try t.expect(daemon.state == .stopped);
try daemon.start();
@ -533,6 +591,7 @@ test "start-poweroff" {
const gui_reader = gui_stdin.reader();
var daemon = try Daemon.init(arena, gui_stdout.reader(), gui_stdin.writer(), "/dev/null");
daemon.want_network_report = false;
daemon.want_bitcoind_report = false;
defer {
daemon.deinit();
gui_stdin.close();

@ -0,0 +1,296 @@
//! a bitcoin core RPC client.
const std = @import("std");
const ArenaAllocator = std.heap.ArenaAllocator;
const Atomic = std.atomic.Atomic;
const base64enc = std.base64.standard.Encoder;
pub const Client = struct {
allocator: std.mem.Allocator,
cookiepath: []const u8,
addr: []const u8 = "127.0.0.1",
port: u16 = 8332,
// each request gets a new ID with a value of reqid.fetchAdd(1, .Monotonic)
reqid: Atomic(u64) = Atomic(u64).init(1),
pub const Method = enum {
getblockchaininfo,
getblockhash,
getmempoolinfo,
getnetworkinfo,
};
pub const RpcError = error{
// json-rpc 2.0
RpcInvalidRequest,
RpcMethodNotFound,
RpcInvalidParams,
RpcInternalError,
RpcParseError,
// general purpose errors
RpcMiscError,
RpcTypeError,
RpcInvalidAddressOrKey,
RpcOutOfMemory,
RpcInvalidParameter,
RpcDatabaseError,
RpcDeserializationError,
RpcVerifyError,
RpcVerifyRejected,
RpcVerifyAlreadyInChain,
RpcInWarmup,
RpcMethodDeprecated,
// p2p client errors
RpcClientNotConnected,
RpcClientInInitialDownload,
RpcClientNodeAlreadyAdded,
RpcClientNodeNotAdded,
RpcClientNodeNotConnected,
RpcClientInvalidIpOrSubnet,
RpcClientP2pDisabled,
RpcClientNodeCapacityReached,
// chain errors
RpcClientMempoolDisabled,
};
pub fn Result(comptime m: Method) type {
return struct {
value: ResultValue(m),
arena: *ArenaAllocator,
pub fn deinit(self: @This()) void {
const allocator = self.arena.child_allocator;
self.arena.deinit();
allocator.destroy(self.arena);
}
};
}
pub fn ResultValue(comptime m: Method) type {
return switch (m) {
.getblockchaininfo => BlockchainInfo,
.getblockhash => []const u8,
.getmempoolinfo => MempoolInfo,
.getnetworkinfo => NetworkInfo,
};
}
pub fn MethodArgs(comptime m: Method) type {
return switch (m) {
.getblockchaininfo, .getmempoolinfo, .getnetworkinfo => void,
.getblockhash => struct { height: u64 },
};
}
fn RpcRequest(comptime m: Method) type {
return struct {
jsonrpc: []const u8 = "1.0",
id: u64,
method: []const u8,
params: MethodArgs(m),
};
}
fn RpcResponse(comptime m: Method) type {
return struct {
id: u64,
result: ?ResultValue(m),
@"error": ?struct {
code: isize,
//message: ?[]const u8, // no use for it atm
},
};
}
/// makes an RPC call to the addr:port endpoint.
/// the returned value must be deinit'ed when done.
pub fn call(self: *Client, comptime method: Method, args: MethodArgs(method)) !Result(method) {
const addrport = try std.net.Address.resolveIp(self.addr, self.port);
const reqbytes = try self.formatreq(method, args);
defer self.allocator.free(reqbytes);
// connect and send the request
const stream = try std.net.tcpConnectToAddress(addrport);
defer stream.close();
const reader = stream.reader();
_ = try stream.writer().writeAll(reqbytes);
// read and parse the response
try skipResponseHeaders(reader, 4096);
const body = try reader.readAllAlloc(self.allocator, 1 << 20); // 1Mb should be enough for all response types
defer self.allocator.free(body);
return self.parseResponse(method, body);
}
/// reads all response headers, at most `limit` bytes, and returns the index
/// at which response body starts or error.EndOfStream.
/// single header length must be at most `limit` or 1024, whichever is smaller.
fn skipResponseHeaders(r: anytype, comptime limit: usize) !void {
var n: usize = 0;
var buf: [@min(1024, limit)]u8 = undefined;
while (true) {
const slice = try r.readUntilDelimiter(&buf, '\n');
n += slice.len + 1; // delimiter is not included in the slice
if (n > limit) {
return error.StreamTooLong;
}
if (slice.len == 0 or (slice.len == 1 and slice[0] == '\r')) {
return;
}
}
}
fn parseResponse(self: Client, comptime m: Method, b: []const u8) !Result(m) {
var result = Result(m){
.value = undefined,
.arena = try self.allocator.create(ArenaAllocator),
};
errdefer self.allocator.destroy(result.arena);
result.arena.* = ArenaAllocator.init(self.allocator);
var jstream = std.json.TokenStream.init(b);
const jopt = std.json.ParseOptions{ .allocator = result.arena.allocator(), .ignore_unknown_fields = true };
const resp = try std.json.parse(RpcResponse(m), &jstream, jopt);
errdefer result.arena.deinit();
if (resp.@"error") |errfield| {
return rpcErrorFromCode(errfield.code) orelse error.UnknownError;
}
if (resp.result == null) {
return error.NullResult;
}
result.value = resp.result.?;
return result;
}
fn formatreq(self: *Client, comptime m: Method, args: MethodArgs(m)) ![]const u8 {
const req = RpcRequest(m){
.id = self.reqid.fetchAdd(1, .Monotonic),
.method = @tagName(m),
.params = args,
};
var jreq = std.ArrayList(u8).init(self.allocator);
defer jreq.deinit();
try std.json.stringify(req, .{}, jreq.writer());
const auth = try self.getAuthBase64();
defer self.allocator.free(auth);
var bytes = std.ArrayList(u8).init(self.allocator);
const w = bytes.writer();
try w.writeAll("POST / HTTP/1.0\r\n");
//try w.writeAll("Host: 127.0.0.1\n", .{});
try w.writeAll("Connection: close\r\n");
try w.print("Authorization: Basic {s}\r\n", .{auth});
try w.writeAll("Accept: application/json-rpc\r\n");
try w.writeAll("Content-Type: application/json-rpc\r\n");
try w.print("Content-Length: {d}\r\n", .{jreq.items.len});
try w.writeAll("\r\n");
try w.writeAll(jreq.items);
return bytes.toOwnedSlice();
}
fn getAuthBase64(self: Client) ![]const u8 {
const file = try std.fs.openFileAbsolute(self.cookiepath, .{ .mode = .read_only });
defer file.close();
const cookie = try file.readToEndAlloc(self.allocator, 1024);
defer self.allocator.free(cookie);
var auth = try self.allocator.alloc(u8, base64enc.calcSize(cookie.len));
return base64enc.encode(auth, cookie);
}
// taken from bitcoind source code.
// see https://github.com/bitcoin/bitcoin/blob/64440bb73/src/rpc/protocol.h#L23
fn rpcErrorFromCode(code: isize) ?RpcError {
return switch (code) {
// json-rpc 2.0
-32600 => error.RpcInvalidRequest,
-32601 => error.RpcMethodNotFound,
-32602 => error.RpcInvalidParams,
-32603 => error.RpcInternalError,
-32700 => error.RpcParseError,
// general purpose errors
-1 => error.RpcMiscError,
-3 => error.RpcTypeError,
-5 => error.RpcInvalidAddressOrKey,
-7 => error.RpcOutOfMemory,
-8 => error.RpcInvalidParameter,
-20 => error.RpcDatabaseError,
-22 => error.RpcDeserializationError,
-25 => error.RpcVerifyError,
-26 => error.RpcVerifyRejected,
-27 => error.RpcVerifyAlreadyInChain,
-28 => error.RpcInWarmup,
-32 => error.RpcMethodDeprecated,
// p2p client errors
-9 => error.RpcClientNotConnected,
-10 => error.RpcClientInInitialDownload,
-23 => error.RpcClientNodeAlreadyAdded,
-24 => error.RpcClientNodeNotAdded,
-29 => error.RpcClientNodeNotConnected,
-30 => error.RpcClientInvalidIpOrSubnet,
-31 => error.RpcClientP2pDisabled,
-34 => error.RpcClientNodeCapacityReached,
// chain errors
-33 => error.RpcClientMempoolDisabled,
else => null,
};
}
};
pub const BlockchainInfo = struct {
chain: []const u8,
blocks: u64,
headers: u64,
bestblockhash: []const u8,
difficulty: f64,
time: u64, // block time, unix epoch
mediantime: u64, // median block time, unix epoch
verificationprogress: f32, // estimate in [0..1]
initialblockdownload: bool,
size_on_disk: u64,
pruned: bool,
//pruneheight: ?u64, // present if pruning is enabled
//automatic_prunning: ?bool, // present if pruning is enabled
//prune_target_size: ?u64, // present if automatic is enabled
warnings: []const u8,
};
pub const MempoolInfo = struct {
loaded: bool, // whether the mempool is fully loaded
size: usize, // tx count
bytes: u64, // sum of all virtual transaction sizes as per BIP-141 (discounted witness data)
usage: u64, // total memory usage
total_fee: f32, // total fees in BTC ignoring modified fees through prioritisetransaction
maxmempool: u64, // memory usage cap, in bytes
mempoolminfee: f32, // min fee rate in BTC/kvB for tx to be accepted
minrelaytxfee: f32, // current min relay fee rate
incrementalrelayfee: f32, // min fee rate increment for replacement, in BTC/kvB
unbroadcastcount: u64, // number of transactions that haven't passed initial broadcast yet
fullrbf: bool, // whether the mempool accepts RBF without replaceability signaling inspection
};
pub const NetworkInfo = struct {
version: u32,
subversion: []const u8,
protocolversion: u32,
connections: u16, // in + out
connections_in: u16,
connections_out: u16,
networkactive: bool,
networks: []struct {
name: []const u8, // ipv4, ipv6, onion, i2p, cjdns
limited: bool, // whether this network is limited with -onlynet flag
reachable: bool,
},
relayfee: f32, // min rate, in BTC/kvB
incrementalfee: f32, // min rate increment for RBF in BTC/vkB
localaddresses: []struct {
address: []const u8,
port: u16,
score: i16,
},
warnings: []const u8,
};

@ -0,0 +1,20 @@
const std = @import("std");
const base64enc = std.base64.standard.Encoder;
const bitcoinrpc = @import("bitcoindrpc");
pub fn main() !void {
var gpa_state = std.heap.GeneralPurposeAllocator(.{}){};
defer if (gpa_state.deinit()) {
std.debug.print("memory leaks detected!", .{});
};
const gpa = gpa_state.allocator();
var client = bitcoinrpc.Client{
.allocator = gpa,
.cookiepath = "/ssd/bitcoind/mainnet/.cookie",
};
const res = try client.call(.getmempoolinfo, {});
defer res.deinit();
std.debug.print("{any}\n", .{res.value});
}