Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions src/output_results/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,13 @@ pub fn output_results(
continue;
}

let partials: Result<Vec<Vec<Box<dyn Aggregator>>>> = ranges
type Partial = (Vec<Box<dyn Aggregator>>, Vec<(DateTime<Local>, String)>);
let partials: Result<Vec<Partial>> = ranges
.par_iter()
.map(|range| -> Result<Vec<Box<dyn Aggregator>>> {
.map(|range| -> Result<Partial> {
let mut local_aggregators: Vec<Box<dyn Aggregator>> =
aggregators.iter().map(|a| a.boxed_clone()).collect();
let mut printed: Vec<(DateTime<Local>, String)> = Vec::new();

let slice = &bytes[range.clone()];

Expand All @@ -142,6 +144,7 @@ pub fn output_results(
&filter_container,
&mut local_aggregators,
converted_args.print_details,
&mut printed,
)?;
record_start = offset;
}
Expand All @@ -156,18 +159,28 @@ pub fn output_results(
&filter_container,
&mut local_aggregators,
converted_args.print_details,
&mut printed,
)?;
}
Ok(local_aggregators)
Ok((local_aggregators, printed))
})
.collect();

debug!("Finished output in: {:?}", timing.elapsed());
let partials = partials?;
for partial in partials {
for (i, aggregator) in partial.into_iter().enumerate() {
let mut printed: Vec<(DateTime<Local>, String)> = Vec::new();
for (partial_aggregators, partial_printed) in partials {
for (i, aggregator) in partial_aggregators.into_iter().enumerate() {
aggregators[i].merge_box(aggregator.as_ref());
}
printed.extend(partial_printed);
}
if converted_args.print_details {
// Stable sort keeps records with equal timestamps in file order.
printed.sort_by_key(|entry| entry.0);
for (_, text) in printed {
println!("{text}");
}
}
for agg in &mut *aggregators {
agg.print();
Expand All @@ -192,6 +205,7 @@ fn filter_record(
filters: &FilterContainer,
local_aggregators: &mut Vec<Box<dyn Aggregator>>,
print: bool,
printed: &mut Vec<(DateTime<Local>, String)>,
) -> Result<()> {
for filter in &filters.filters {
if !filter.matches(record, &filters.format) {
Expand Down Expand Up @@ -238,7 +252,7 @@ fn filter_record(
)?;

if print {
println!("{text}");
printed.push((log_time_local, text.to_string()));
}
Ok(())
}
Expand Down
38 changes: 38 additions & 0 deletions tests/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,44 @@ fn simple_filter_with_hist_subcommand() -> Result<(), Box<dyn std::error::Error>
Ok(())
}

#[test]
fn error_output_sorted_by_timestamp() -> Result<(), Box<dyn std::error::Error>> {
let mut cmd = Command::new(cargo::cargo_bin!("pgweasel"));

// The records in debian_default2.log are deliberately out of order in the
// file, so a correct sort must reorder them by leading timestamp.
let output = cmd
.args(["err", "./tests/files/debian_default2.log"])
.assert()
.success()
.get_output()
.stdout
.clone();
let stdout = String::from_utf8(output)?;

// Collect the leading "YYYY-MM-DD HH:MM:SS.mmm" timestamp of every record.
let timestamps: Vec<&str> = stdout
.lines()
.filter(|line| line.len() >= 23 && line.as_bytes()[4] == b'-')
.map(|line| &line[..23])
.collect();

assert!(
timestamps.len() > 1,
"expected multiple error records, got {}",
timestamps.len()
);

let mut sorted = timestamps.clone();
sorted.sort();
assert_eq!(
timestamps, sorted,
"error output was not sorted by timestamp"
);

Ok(())
}

#[test]
fn non_existing_file() -> Result<(), Box<dyn std::error::Error>> {
let mut cmd = Command::new(cargo::cargo_bin!("pgweasel"));
Expand Down
Loading