]> Humopery - polarsbop.git/commitdiff
read bucket's partitions into a single DF
authorErik Mackdanz <erikmack@gmail.com>
Thu, 30 Nov 2023 06:14:28 +0000 (00:14 -0600)
committerErik Mackdanz <erikmack@gmail.com>
Thu, 30 Nov 2023 06:14:28 +0000 (00:14 -0600)
src/main.rs

index 1f40f7d707091292924259410cd8495f80bbe279..dee3961ace8df7f5ff12f4258e0c6dd6f47dcf13 100644 (file)
@@ -4,6 +4,7 @@ use std::error::Error;
 use std::env::args;
 use std::io::Write;
 use std::fs::{create_dir_all,File};
+use std::path::PathBuf;
 
 async fn map_bucket(partition: &str) -> Result<(),Box<dyn Error>> {
 
@@ -141,13 +142,18 @@ async fn reduce_bucket(bucket: &str) -> Result<(),Box<dyn Error>> {
     let config = aws_config::load_from_env().await;
     let client = aws_sdk_s3::Client::new(&config);
 
+    // file list for LazyJsonLineReader
+    let mut filepaths: [PathBuf; 3] =
+       [PathBuf::new(),PathBuf::new(),PathBuf::new()];
+
     // there are three input files for each income price bucket
-    for partition in [0,1,2] {
+    for partition in 0..3 {
        println!("downloading partition {}",partition);
 
        let downloadfilepath = format!(
            "out/reducebucket/in/bucket-{}/partition-{}.json",
            bucket, partition);
+       filepaths[partition] = PathBuf::from(downloadfilepath.clone());
        let mut downloadfile = File::create(downloadfilepath)?;
 
        let ins3key =
@@ -168,8 +174,13 @@ async fn reduce_bucket(bucket: &str) -> Result<(),Box<dyn Error>> {
        }
     }
 
-    // todo: read partitions to DF
-    // todo: concat partitions in bucket
+    // read partitions to DF
+    let bucketdf =
+       LazyJsonLineReader::new_paths(Arc::new(filepaths))
+       .finish()?;
+    println!("bucketdf: {:?}",bucketdf.collect()?);
+
+
     // todo: some aggregation on my bucket
 
     Ok(())