yats.git

ref: d5669fb6eab6ee1b6741def4f86f43bf024f0713

client/client-parquet.go


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
 * Yats - yats
 *
 * This file is licensed under the Affero General Public License version 3 or
 * later. See the COPYING file.
 *
 * @author Paolo Lulli <kevwe.com>
 * @copyright Paolo Lulli 2024
 */

package main

import (
	"fmt"
	"github.com/xitongsys/parquet-go/parquet"
	"log"
	"os"
	"strconv"

	"github.com/xitongsys/parquet-go/writer"
)

func MapToParquet(data []map[string]any, parquetFile string) error {
	// Create or truncate the file
	file, err := os.Create(parquetFile)
	if err != nil {
		return err
	}
	defer file.Close()
	// Define the Parquet schema
	schema := make(map[string]string)
	for key := range data {
		schema[strconv.Itoa(key)] = "BYTE_ARRAY"
	}

	// Create a new ParquetWriter
	pw, err := writer.NewParquetWriterFromWriter(file, schema, 4)
	if err != nil {
		log.Println("Can't create parquet writer", err)
		return err
	}

	for _, row := range data {

		// Set writer properties
		pw.RowGroupSize = 128 * 1024 * 1024 // 128MB
		pw.CompressionType = parquet.CompressionCodec_SNAPPY

		// Write the data
		if err = pw.Write(row); err != nil {
			fmt.Printf(err.Error())
			return err
		}

		if err = pw.WriteStop(); err != nil {
			fmt.Printf(err.Error())
			return err
		}
		fmt.Printf("Wrote %d records to %s\n", len(row), parquetFile)
	}
	return nil
}