Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
fail-fast: false
matrix:
include:
- { rust: stable, os: ubuntu-22.04 }
- { rust: stable, os: ubuntu-24.04 }
services:
clickhouse:
image: clickhouse/clickhouse-server:25.8.4
Expand All @@ -28,7 +28,7 @@ jobs:
--ulimit nofile=262144:262144
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v6
- uses: dtolnay/rust-toolchain@stable
with:
toolchain: ${{ matrix.rust }}
Expand All @@ -49,7 +49,7 @@ jobs:
fail-fast: false
matrix:
include:
- { rust: 1.75.0, os: ubuntu-22.04 }
- { rust: 1.85.0, os: ubuntu-24.04 }
services:
clickhouse:
image: clickhouse/clickhouse-server:25.8.4
Expand All @@ -61,7 +61,7 @@ jobs:
--ulimit nofile=262144:262144
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v6
with:
submodules: true
- uses: dtolnay/rust-toolchain@stable
Expand All @@ -84,10 +84,10 @@ jobs:
fail-fast: false
matrix:
include:
- { rust: nightly-2024-03-31, os: ubuntu-22.04 }
- { rust: nightly-2026-03-10, os: ubuntu-24.04 }
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v6
with:
submodules: true
- uses: dtolnay/rust-toolchain@stable
Expand All @@ -108,10 +108,10 @@ jobs:
fail-fast: false
matrix:
include:
- { rust: stable, os: ubuntu-22.04 }
- { rust: stable, os: ubuntu-24.04 }
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v6
with:
submodules: true
- uses: dtolnay/rust-toolchain@stable
Expand All @@ -129,10 +129,10 @@ jobs:
# fail-fast: false
# matrix:
# include:
# - { rust: stable, os: ubuntu-22.04 }
# - { rust: stable, os: ubuntu-24.04 }
# steps:
# - name: Checkout
# uses: actions/checkout@v4
# uses: actions/checkout@v6
# with:
# submodules: true
# - uses: dtolnay/rust-toolchain@stable
Expand Down
4 changes: 2 additions & 2 deletions klickhouse/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
name = "klickhouse"
version = "0.14.0"
authors = ["Protryon <max.bruce12@gmail.com>"]
edition = "2021"
edition = "2024"
license = "MIT OR Apache-2.0"
repository = "https://github.com/Protryon/klickhouse"
description = "Klickhouse is a pure Rust SDK for working with Clickhouse with the native protocol in async environments with minimal boilerplate and maximal performance."
keywords = ["clickhouse", "database", "tokio", "sql"]
readme = "../README.md"
autotests = false
rust-version = "1.75.0"
rust-version = "1.85.0"

[package.metadata.docs.rs]
all-features = true
Expand Down
2 changes: 1 addition & 1 deletion klickhouse/build.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use rustc_version::{version, Version};
use rustc_version::{Version, version};

fn main() {
if version().unwrap() >= Version::parse("1.51.0").unwrap() {
Expand Down
44 changes: 29 additions & 15 deletions klickhouse/examples/basic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use chrono::Utc;
use futures_util::StreamExt;
use klickhouse::*;
use tokio::sync::oneshot;

#[derive(Row, Debug, Default)]
pub struct MyUserData {
Expand All @@ -18,25 +19,38 @@ async fn main() {
.await
.unwrap();

let (tx, mut rx) = oneshot::channel::<()>();

// Retrieve and display query progress events
let mut progress = client.subscribe_progress();
let progress_task = tokio::task::spawn(async move {
let mut current_query = Uuid::nil();
let mut progress_total = Progress::default();
while let Ok((query, progress)) = progress.recv().await {
if query != current_query {
progress_total = Progress::default();
current_query = query;

loop {
tokio::select! {
// Stop task
_ = &mut rx => {
break
}

// Progress loop
Ok((query, progress)) = progress.recv() => {
if query != current_query {
progress_total = Progress::default();
current_query = query;
}
progress_total += progress;
println!(
"Progress on query {}: {}/{} {:.2}%",
query,
progress_total.read_rows,
progress_total.new_total_rows_to_read,
100.0 * progress_total.read_rows as f64
/ progress_total.new_total_rows_to_read as f64
);
}
}
progress_total += progress;
println!(
"Progress on query {}: {}/{} {:.2}%",
query,
progress_total.read_rows,
progress_total.new_total_rows_to_read,
100.0 * progress_total.read_rows as f64
/ progress_total.new_total_rows_to_read as f64
);
}
});

Expand Down Expand Up @@ -81,7 +95,7 @@ async fn main() {
println!("row received '{}': {:?}", row.id, row);
}

// Drop the client so that the progress task finishes.
drop(client);
// Send signal so that the progress task finishes.
tx.send(()).unwrap();
progress_task.await.unwrap();
}
44 changes: 29 additions & 15 deletions klickhouse/examples/pool.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use chrono::Utc;
use futures_util::StreamExt;
use klickhouse::*;
use tokio::sync::oneshot;

#[derive(Row, Debug, Default)]
pub struct MyUserData {
Expand All @@ -26,25 +27,38 @@ async fn main() {

let client = pool.get().await.unwrap();

let (tx, mut rx) = oneshot::channel::<()>();

// Retrieve and display query progress events
let mut progress = client.subscribe_progress();
let progress_task = tokio::task::spawn(async move {
let mut current_query = Uuid::nil();
let mut progress_total = Progress::default();
while let Ok((query, progress)) = progress.recv().await {
if query != current_query {
progress_total = Progress::default();
current_query = query;

loop {
tokio::select! {
// Stop task
_ = &mut rx => {
break
}

// Progress loop
Ok((query, progress)) = progress.recv() => {
if query != current_query {
progress_total = Progress::default();
current_query = query;
}
progress_total += progress;
println!(
"Progress on query {}: {}/{} {:.2}%",
query,
progress_total.read_rows,
progress_total.new_total_rows_to_read,
100.0 * progress_total.read_rows as f64
/ progress_total.new_total_rows_to_read as f64
);
}
}
progress_total += progress;
println!(
"Progress on query {}: {}/{} {:.2}%",
query,
progress_total.read_rows,
progress_total.new_total_rows_to_read,
100.0 * progress_total.read_rows as f64
/ progress_total.new_total_rows_to_read as f64
);
}
});

Expand Down Expand Up @@ -89,7 +103,7 @@ async fn main() {
println!("row received '{}': {:?}", row.id, row);
}

// Drop the client so that the progress task finishes.
drop(client);
// Send signal so that the progress task finishes.
tx.send(()).unwrap();
progress_task.await.unwrap();
}
2 changes: 1 addition & 1 deletion klickhouse/rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
edition = "2021"
edition = "2024"
2 changes: 1 addition & 1 deletion klickhouse/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use indexmap::IndexMap;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

use crate::{
KlickhouseError,
io::{ClickhouseRead, ClickhouseWrite},
types::{DeserializerState, SerializerState, Type},
values::Value,
KlickhouseError,
};

/// Metadata about a block
Expand Down
6 changes: 3 additions & 3 deletions klickhouse/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::VecDeque;

use futures_util::{stream, Stream, StreamExt};
use futures_util::{Stream, StreamExt, stream};
use indexmap::IndexMap;
use protocol::CompressionMethod;
use tokio::{
Expand All @@ -17,6 +17,7 @@ use tokio_stream::wrappers::ReceiverStream;
use uuid::Uuid;

use crate::{
KlickhouseError, ParsedQuery, RawRow, Result,
block::{Block, BlockInfo},
convert::Row,
internal_client_in::InternalClientIn,
Expand All @@ -26,7 +27,6 @@ use crate::{
io::{ClickhouseRead, ClickhouseWrite},
progress::Progress,
protocol::{self, ServerPacket},
KlickhouseError, ParsedQuery, RawRow, Result,
};
use log::*;

Expand Down Expand Up @@ -130,7 +130,7 @@ impl<R: ClickhouseRead + 'static, W: ClickhouseWrite> InnerClient<R, W> {
ServerPacket::Hello(_) => {
return Err(KlickhouseError::ProtocolError(
"unexpected retransmission of server hello".to_string(),
))
));
}
ServerPacket::Data(block) => {
if let Some((_, current)) = self.executing_query.as_ref() {
Expand Down
2 changes: 1 addition & 1 deletion klickhouse/src/convert/json.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde::{Deserialize, Serialize, de::DeserializeOwned};

use crate::{FromSql, KlickhouseError, Result, ToSql, Type, Value};

Expand Down
2 changes: 1 addition & 1 deletion klickhouse/src/convert/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::borrow::Cow;

use crate::{types::Type, KlickhouseError, Result, Value};
use crate::{KlickhouseError, Result, Value, types::Type};

mod raw_row;
mod std_deserialize;
Expand Down
6 changes: 1 addition & 5 deletions klickhouse/src/convert/raw_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,7 @@ pub trait RowIndex {
impl RowIndex for usize {
fn get<'a, I: IntoIterator<Item = &'a str>>(&self, columns: I) -> Option<usize> {
let count = columns.into_iter().count();
if count >= *self {
Some(*self)
} else {
None
}
if count >= *self { Some(*self) } else { None }
}
}

Expand Down
2 changes: 1 addition & 1 deletion klickhouse/src/convert/std_deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ impl<T: FromSql> FromSql for Box<T> {
}

macro_rules! tuple_impls {
($($len:expr => ($($n:tt $name:ident)+))+) => {
($($len:literal => ($($n:tt $name:ident)+))+) => {
$(
impl<$($name: FromSql),+> FromSql for ($($name,)+) {
fn from_sql(type_: &Type, value: Value) -> Result<Self> {
Expand Down
2 changes: 1 addition & 1 deletion klickhouse/src/convert/std_serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl<T: ToSql> ToSql for Box<T> {
}

macro_rules! tuple_impls {
($($len:expr => ($($n:tt $name:ident)+))+) => {
($($len:literal => ($($n:tt $name:ident)+))+) => {
$(
impl<$($name: ToSql),+> ToSql for ($($name,)+) {
fn to_sql(self, type_hint: Option<&Type>) -> Result<Value> {
Expand Down
15 changes: 8 additions & 7 deletions klickhouse/src/internal_client_in.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use crate::Result;
use crate::{
KlickhouseError,
block::Block,
io::ClickhouseRead,
progress::Progress,
protocol::{
self, BlockStreamProfileInfo, CompressionMethod, ServerData, ServerException, ServerHello,
ServerPacket, TableColumns, TableStatus, TablesStatusResponse,
DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO, DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME,
DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE, DBMS_MIN_REVISION_WITH_VERSION_PATCH,
MAX_STRING_SIZE,
self, BlockStreamProfileInfo, CompressionMethod, DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO,
DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME, DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE,
DBMS_MIN_REVISION_WITH_VERSION_PATCH, MAX_STRING_SIZE, ServerData, ServerException,
ServerHello, ServerPacket, TableColumns, TableStatus, TablesStatusResponse,
},
KlickhouseError,
};
use indexmap::IndexMap;
use log::trace;
Expand Down Expand Up @@ -62,7 +61,9 @@ impl<R: ClickhouseRead + 'static> InternalClientIn<R> {

#[cfg(not(feature = "compression"))]
async fn decompress_data(&mut self, _compression: CompressionMethod) -> Result<Block> {
panic!("attempted to use compression when not compiled with `compression` feature in klickhouse");
panic!(
"attempted to use compression when not compiled with `compression` feature in klickhouse"
);
}

async fn receive_data(&mut self, compression: CompressionMethod) -> Result<ServerData> {
Expand Down
10 changes: 6 additions & 4 deletions klickhouse/src/internal_client_out.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::{
Result,
block::Block,
io::ClickhouseWrite,
protocol::{
self, CompressionMethod, ServerHello, DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH,
self, CompressionMethod, DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH,
DBMS_MIN_REVISION_WITH_CLIENT_INFO, DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET,
DBMS_MIN_REVISION_WITH_OPENTELEMETRY, DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO,
DBMS_MIN_REVISION_WITH_VERSION_PATCH,
DBMS_MIN_REVISION_WITH_VERSION_PATCH, ServerHello,
},
Result,
};
use tokio::io::AsyncWriteExt;
use uuid::Uuid;
Expand Down Expand Up @@ -185,7 +185,9 @@ impl<W: ClickhouseWrite> InternalClientOut<W> {

#[cfg(not(feature = "compression"))]
async fn compress_data(&mut self, _byte: u8, _block: &Block) -> Result<()> {
panic!("attempted to use compression when not compiled with `compression` feature in klickhouse");
panic!(
"attempted to use compression when not compiled with `compression` feature in klickhouse"
);
}

pub async fn send_data(
Expand Down
Loading