use polars::prelude::*;
use std::error::Error;
use std::env::args;
-use std::fmt;
use std::io::Write;
use std::fs::{create_dir_all,File};
-#[derive(Debug)]
-enum PolarsBOPError {
- NestedPolarsError(PolarsError),
- FilesystemError(Box<dyn Error>),
- S3PutError(Box<dyn Error>),
- S3GetError(Box<dyn Error>),
-}
-
-impl std::fmt::Display for PolarsBOPError {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(f, "PolarsBOPError")
- }
-}
-impl std::error::Error for PolarsBOPError {
-}
-
-async fn map_bucket(partition: &str) -> Result<(),PolarsBOPError> {
+async fn map_bucket(partition: &str) -> Result<(),Box<dyn Error>> {
// set some columns to be parsed as numeric
let mut schema_override = Schema::new();
)));
// read and parse from disk
- let df = LazyCsvReader::new("data/bop.csv")
+ let mut df = LazyCsvReader::new("data/bop.csv")
.has_header(true)
.with_dtype_overwrite(schema_override)
.with_null_values(null_values)
- .finish();
-
- if let Err(e) = df {
- return Err(PolarsBOPError::NestedPolarsError(e));
- };
- let mut df = df.unwrap();
+ .finish()?;
// split list based on arg
match partition {
let mask = col("Value").lt(lit(high));
df_bucket = df_bucket.filter(mask);
- let df_bucket = df_bucket.collect();
- if let Err(e) = df_bucket {
- return Err(PolarsBOPError::NestedPolarsError(e));
- }
- let mut df_bucket = df_bucket.unwrap();
+ let mut df_bucket = df_bucket.collect()?;
// some console output
println!("df_bucket: {:?}",df_bucket);
// write to local filesystem
- if let Err(e) = JsonWriter::new(&mut file)
- .finish(&mut df_bucket) {
- return Err(PolarsBOPError::NestedPolarsError(e));
- };
+ JsonWriter::new(&mut file)
+ .finish(&mut df_bucket)?;
println!("wrote local file {}", outfilepath);
// open local file for writing to s3
- let bytes = ByteStream::from_path(outfilepath).await;
- if let Err(e) = bytes {
- return Err(PolarsBOPError::FilesystemError(Box::new(e)));
- }
- let bytes = bytes.unwrap();
+ let bytes = ByteStream::from_path(outfilepath).await?;
// write to s3
- if let Err(e) = client.put_object()
+ client.put_object()
.bucket("mackdanz-polars-test")
.key(outs3key.clone())
.set_body(Some(bytes))
- .send().await {
- return Err(PolarsBOPError::S3PutError(Box::new(e)));
- }
+ .send().await?;
println!("wrote file to S3 {}", outs3key);
}
Ok(())
}
-async fn reduce_bucket(bucket: &str) -> Result<(),PolarsBOPError> {
+async fn reduce_bucket(bucket: &str) -> Result<(),Box<dyn Error>> {
// prep input dir
let bucketdir = format!("bucket-{}",bucket);
create_dir_all(format!("out/reducebucket/in/{}",bucketdir.clone()))
let downloadfilepath = format!(
"out/reducebucket/in/bucket-{}/partition-{}.json",
bucket, partition);
- let downloadfile = File::create(downloadfilepath);
- if let Err(e) = downloadfile {
- return Err(PolarsBOPError::FilesystemError(Box::new(e)));
- }
- let mut downloadfile = downloadfile.unwrap();
+ let mut downloadfile = File::create(downloadfilepath)?;
let ins3key =
format!("mapbucket/bucket-{}/partition-{}.json",
let getobjresult = client.get_object()
.bucket("mackdanz-polars-test")
.key(ins3key.clone())
- .send().await;
-
- if let Err(e) = getobjresult {
- return Err(PolarsBOPError::S3GetError(Box::new(e)));
- }
- let getobjresult = getobjresult.unwrap();
+ .send().await?;
// stream body to local filesystem
// https://docs.aws.amazon.com/sdk-for-rust/latest/dg/rust_s3_code_examples.html
let mut body = getobjresult.body;
loop {
- let bytes = body.try_next().await;
- if let Err(e) = bytes {
- return Err(PolarsBOPError::S3GetError(Box::new(e)));
- }
- let bytes = bytes.unwrap();
+ let bytes = body.try_next().await?;
if let Some(b) = bytes {
- let writeres = downloadfile.write(&b);
- if let Err(e) = writeres {
- return Err(PolarsBOPError::FilesystemError(Box::new(e)));
- }
+ let _ = downloadfile.write(&b)?;
} else {
break;
}
}
#[::tokio::main]
-async fn main() -> Result<(),PolarsBOPError> {
+async fn main() -> Result<(),Box<dyn Error>> {
let mut argsiter = args();
// first arg is program name