parser: implement streaming parser

With my pathological 50MiB 10_000 line nested list test, this is
definitely slower than the one shot parser, but it has peak memory
usage of 5MiB compared to the 120MiB of the one-shot parsing. Not bad.
Obviously this result is largely dependent on the fact that this
particular benchmark is 99% whitespace, which does not get copied into
the resulting document. A (significantly) smaller improvement will be
observed in files that are mostly data with little indentation or
empty lines.

But a win is a win.
This commit is contained in:
torque 2023-09-25 01:18:09 -07:00
parent 5037f69fbe
commit 7f82c24584
Signed by: torque
SSH Key Fingerprint: SHA256:nCrXefBNo6EbjNSQhv0nXmEg/VuNq3sMF5b8zETw3Tk
5 changed files with 85 additions and 13 deletions

View File

@ -25,6 +25,7 @@ const Example = struct {
const examples = [_]Example{ const examples = [_]Example{
.{ .name = "parse", .file = "examples/parse.zig" }, .{ .name = "parse", .file = "examples/parse.zig" },
.{ .name = "stream", .file = "examples/stream.zig" },
}; };
pub fn add_examples(b: *std.build, options: ExampleOptions) void { pub fn add_examples(b: *std.build, options: ExampleOptions) void {

30
examples/stream.zig Normal file
View File

@ -0,0 +1,30 @@
const std = @import("std");
const nice = @import("nice");
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
const args = try std.process.argsAlloc(allocator);
defer std.process.argsFree(allocator, args);
if (args.len < 2) return;
const document: nice.Document = doc: {
const file = try std.fs.cwd().openFile(args[1], .{});
defer file.close();
var parser = try nice.StreamParser.init(allocator, .{});
defer parser.deinit();
while (true) {
var buf = [_]u8{0} ** 1024;
const len = try file.read(&buf);
if (len == 0) break;
try parser.feed(buf[0..len]);
}
break :doc try parser.finish();
};
defer document.deinit();
document.printDebug();
}

View File

@ -25,6 +25,10 @@ pub const LineBuffer = struct {
}; };
} }
pub fn deinit(self: LineBuffer) void {
self.allocator.free(self.internal.buffer);
}
pub fn feed(self: *LineBuffer, data: []const u8) Error!void { pub fn feed(self: *LineBuffer, data: []const u8) Error!void {
if (data.len == 0) return; if (data.len == 0) return;
// TODO: check for usize overflow here if we want Maximum Robustness // TODO: check for usize overflow here if we want Maximum Robustness
@ -34,23 +38,21 @@ pub const LineBuffer = struct {
if (new_window_len > self.internal.buffer.len) { if (new_window_len > self.internal.buffer.len) {
// TODO: adopt an overallocation strategy? Will potentially avoid allocating // TODO: adopt an overallocation strategy? Will potentially avoid allocating
// on every invocation but will cause the buffer to oversize // on every invocation but will cause the buffer to oversize
try self.allocator.realloc(self.internal.buffer, new_window_len); self.internal.buffer = try self.allocator.realloc(@constCast(self.internal.buffer), new_window_len);
self.rehome(); self.rehome();
@memcpy(self.internal.buffer[self.used..].ptr, data); @memcpy(@constCast(self.internal.buffer[self.used..].ptr), data);
self.used = new_window_len;
self.internal.window.len = new_window_len;
} }
// data will fit, but needs to be moved in the buffer // data will fit, but needs to be moved in the buffer
else if (self.internal.window.start + new_window_len > self.internal.buffer.len) { else if (self.internal.window.start + new_window_len > self.internal.buffer.len) {
self.rehome(); self.rehome();
@memcpy(self.internal.buffer[self.used..].ptr, data); @memcpy(@constCast(self.internal.buffer[self.used..].ptr), data);
self.used = new_window_len;
self.internal.window.len = new_window_len;
} }
// data can simply be appended // data can simply be appended
else { else {
@memcpy(self.internal.buffer[self.used..].ptr, data); @memcpy(@constCast(self.internal.buffer[self.used..].ptr), data);
} }
self.used += data.len;
self.internal.window.len = new_window_len;
} }
/// The memory returned by this function is valid until the next call to `feed`. /// The memory returned by this function is valid until the next call to `feed`.
@ -88,7 +90,7 @@ pub const FixedLineBuffer = struct {
// move the current scan window to the beginning of the buffer. This internal // move the current scan window to the beginning of the buffer. This internal
// method is used by LineBuffer. // method is used by LineBuffer.
fn rehome(self: *LineBuffer) usize { fn rehome(self: *FixedLineBuffer) void {
if (self.window.start == 0) return; if (self.window.start == 0) return;
const window = self.buffer[self.window.start..][0..self.window.len]; const window = self.buffer[self.window.start..][0..self.window.len];
@ -96,9 +98,9 @@ pub const FixedLineBuffer = struct {
// if the window is longer than its starting index, the memory move will be // if the window is longer than its starting index, the memory move will be
// overlapping, so we can't use memcpy // overlapping, so we can't use memcpy
if (self.window.len > self.window.start) if (self.window.len > self.window.start)
std.mem.copyForwards(u8, self.buffer, window) std.mem.copyForwards(u8, @constCast(self.buffer), window)
else else
@memcpy(self.buffer.ptr, window); @memcpy(@constCast(self.buffer.ptr), window);
self.window.start = 0; self.window.start = 0;
} }

View File

@ -65,3 +65,6 @@ pub const buffers = @import("./linebuffer.zig");
pub const tokenizer = @import("./tokenizer.zig"); pub const tokenizer = @import("./tokenizer.zig");
pub const parser = @import("./parser.zig"); pub const parser = @import("./parser.zig");
pub const Parser = parser.Parser; pub const Parser = parser.Parser;
pub const StreamParser = parser.StreamParser;
pub const Document = parser.Document;
pub const Value = parser.Value;

View File

@ -2,9 +2,9 @@ const std = @import("std");
const buffers = @import("./linebuffer.zig"); const buffers = @import("./linebuffer.zig");
const tokenizer = @import("./tokenizer.zig"); const tokenizer = @import("./tokenizer.zig");
const Value = @import("./parser/value.zig").Value;
const State = @import("./parser/state.zig").State; const State = @import("./parser/state.zig").State;
const Document = @import("./parser/state.zig").Document; pub const Document = @import("./parser/state.zig").Document;
pub const Value = @import("./parser/value.zig").Value;
pub const Diagnostics = struct { pub const Diagnostics = struct {
row: usize, row: usize,
@ -67,3 +67,39 @@ pub const Parser = struct {
return try state.finish(self.options); return try state.finish(self.options);
} }
}; };
pub const StreamParser = struct {
linetok: tokenizer.LineTokenizer(buffers.LineBuffer),
state: State,
options: Options = .{},
diagnostics: Diagnostics = .{
.row = 0,
.span = .{ .absolute = 0, .line_offset = 0, .length = 0 },
.message = "all is well",
},
pub fn init(allocator: std.mem.Allocator, options: Options) !StreamParser {
return .{
.linetok = .{
.buffer = try buffers.LineBuffer.init(allocator),
.diagnostics = &@as(*StreamParser, @ptrFromInt(@returnAddress())).diagnostics,
},
.state = State.init(allocator),
.options = options,
};
}
pub fn deinit(self: StreamParser) void {
self.linetok.buffer.deinit();
self.state.deinit();
}
pub fn feed(self: *StreamParser, data: []const u8) Error!void {
try self.linetok.buffer.feed(data);
while (try self.linetok.next()) |line| try self.state.parseLine(line, self.options.duplicate_key_behavior);
}
pub fn finish(self: *StreamParser) Error!Document {
return try self.state.finish(self.options);
}
};