From 2e7b924a0ed3e9e91b33f486d08432e65c9437e1 Mon Sep 17 00:00:00 2001 From: Erik Mackdanz Date: Thu, 30 Nov 2023 00:14:28 -0600 Subject: [PATCH] read bucket's partitions into a single DF --- src/main.rs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/main.rs b/src/main.rs index 1f40f7d..dee3961 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,7 @@ use std::error::Error; use std::env::args; use std::io::Write; use std::fs::{create_dir_all,File}; +use std::path::PathBuf; async fn map_bucket(partition: &str) -> Result<(),Box> { @@ -141,13 +142,18 @@ async fn reduce_bucket(bucket: &str) -> Result<(),Box> { let config = aws_config::load_from_env().await; let client = aws_sdk_s3::Client::new(&config); + // file list for LazyJsonLineReader + let mut filepaths: [PathBuf; 3] = + [PathBuf::new(),PathBuf::new(),PathBuf::new()]; + // there are three input files for each income price bucket - for partition in [0,1,2] { + for partition in 0..3 { println!("downloading partition {}",partition); let downloadfilepath = format!( "out/reducebucket/in/bucket-{}/partition-{}.json", bucket, partition); + filepaths[partition] = PathBuf::from(downloadfilepath.clone()); let mut downloadfile = File::create(downloadfilepath)?; let ins3key = @@ -168,8 +174,13 @@ async fn reduce_bucket(bucket: &str) -> Result<(),Box> { } } - // todo: read partitions to DF - // todo: concat partitions in bucket + // read partitions to DF + let bucketdf = + LazyJsonLineReader::new_paths(Arc::new(filepaths)) + .finish()?; + println!("bucketdf: {:?}",bucketdf.collect()?); + + // todo: some aggregation on my bucket Ok(()) -- 2.52.0