use aws_sdk_s3::primitives::ByteStream;
use polars::prelude::*;
+use std::error::Error;
use std::env::args;
+use std::fmt;
+use std::io::Write;
use std::fs::{create_dir_all,File};
-async fn map_bucket(partition: &str) -> Result<(),PolarsError> {
+#[derive(Debug)]
+enum PolarsBOPError {
+ NestedPolarsError(PolarsError),
+ FilesystemError(Box<dyn Error>),
+ S3PutError(Box<dyn Error>),
+ S3GetError(Box<dyn Error>),
+}
+
+impl std::fmt::Display for PolarsBOPError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "PolarsBOPError")
+ }
+}
+impl std::error::Error for PolarsBOPError {
+}
+
+async fn map_bucket(partition: &str) -> Result<(),PolarsBOPError> {
// set some columns to be parsed as numeric
let mut schema_override = Schema::new();
)));
// read and parse from disk
- let mut df = LazyCsvReader::new("data/bop.csv")
+ let df = LazyCsvReader::new("data/bop.csv")
.has_header(true)
.with_dtype_overwrite(schema_override)
.with_null_values(null_values)
- .finish()?;
+ .finish();
+
+ if let Err(e) = df {
+ return Err(PolarsBOPError::NestedPolarsError(e));
+ };
+ let mut df = df.unwrap();
// split list based on arg
match partition {
let mask = col("Value").lt(lit(high));
df_bucket = df_bucket.filter(mask);
- let mut df_bucket = df_bucket.collect()?;
+ let df_bucket = df_bucket.collect();
+ if let Err(e) = df_bucket {
+ return Err(PolarsBOPError::NestedPolarsError(e));
+ }
+ let mut df_bucket = df_bucket.unwrap();
// some console output
println!("df_bucket: {:?}",df_bucket);
// write to local filesystem
- JsonWriter::new(&mut file)
- .finish(&mut df_bucket)?;
+ if let Err(e) = JsonWriter::new(&mut file)
+ .finish(&mut df_bucket) {
+ return Err(PolarsBOPError::NestedPolarsError(e));
+ };
println!("wrote local file {}", outfilepath);
+ // open local file for writing to s3
+ let bytes = ByteStream::from_path(outfilepath).await;
+ if let Err(e) = bytes {
+ return Err(PolarsBOPError::FilesystemError(Box::new(e)));
+ }
+ let bytes = bytes.unwrap();
+
// write to s3
- let _ = client.put_object()
+ if let Err(e) = client.put_object()
.bucket("mackdanz-polars-test")
.key(outs3key.clone())
- .set_body(Some(ByteStream::from_path(outfilepath)
- .await.unwrap()))
- .send().await;
+ .set_body(Some(bytes))
+ .send().await {
+ return Err(PolarsBOPError::S3PutError(Box::new(e)));
+ }
println!("wrote file to S3 {}", outs3key);
}
Ok(())
}
-async fn reduce_bucket(bucket: &str) -> Result<(),PolarsError> {
+async fn reduce_bucket(bucket: &str) -> Result<(),PolarsBOPError> {
+ // prep input dir
+ let bucketdir = format!("bucket-{}",bucket);
+ create_dir_all(format!("out/reducebucket/in/{}",bucketdir.clone()))
+ .expect("couldn't create input dir");
+
+ // prep S3 client
+ let config = aws_config::load_from_env().await;
+ let client = aws_sdk_s3::Client::new(&config);
+
+ // there are three input files for each income price bucket
+ for partition in [0,1,2] {
+ println!("downloading partition {}",partition);
+
+ let downloadfilepath = format!(
+ "out/reducebucket/in/bucket-{}/partition-{}.json",
+ bucket, partition);
+ let downloadfile = File::create(downloadfilepath);
+ if let Err(e) = downloadfile {
+ return Err(PolarsBOPError::FilesystemError(Box::new(e)));
+ }
+ let mut downloadfile = downloadfile.unwrap();
+
+ let ins3key =
+ format!("mapbucket/bucket-{}/partition-{}.json",
+ bucket,partition);
+
+ // call S3 get_object
+ let getobjresult = client.get_object()
+ .bucket("mackdanz-polars-test")
+ .key(ins3key.clone())
+ .send().await;
+
+ if let Err(e) = getobjresult {
+ return Err(PolarsBOPError::S3GetError(Box::new(e)));
+ }
+ let getobjresult = getobjresult.unwrap();
+
+ // stream body to local filesystem
+ // https://docs.aws.amazon.com/sdk-for-rust/latest/dg/rust_s3_code_examples.html
+ let mut body = getobjresult.body;
+ loop {
+ let bytes = body.try_next().await;
+ if let Err(e) = bytes {
+ return Err(PolarsBOPError::S3GetError(Box::new(e)));
+ }
+ let bytes = bytes.unwrap();
+
+ if let Some(b) = bytes {
+ let writeres = downloadfile.write(&b);
+ if let Err(e) = writeres {
+ return Err(PolarsBOPError::FilesystemError(Box::new(e)));
+ }
+ } else {
+ break;
+ }
+ }
+ }
+
+ // todo: read partitions to DF
+ // todo: concat partitions in bucket
+ // todo: some aggregation on my bucket
+
Ok(())
}
#[::tokio::main]
-async fn main() -> Result<(),PolarsError> {
+async fn main() -> Result<(),PolarsBOPError> {
let mut argsiter = args();
// first arg is program name
}
}
-