Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import io.vertx.core.AbstractVerticle;
- import io.vertx.core.Future;
- import io.vertx.core.Promise;
- import io.vertx.core.Vertx;
- import io.vertx.core.json.JsonObject;
- import io.vertx.pgclient.PgConnectOptions;
- import io.vertx.pgclient.PgPool;
- import io.vertx.sqlclient.PoolOptions;
- import io.vertx.sqlclient.SqlClient;
- import io.vertx.sqlclient.Tuple;
- import java.time.Instant;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Random;
- import java.util.concurrent.ThreadLocalRandom;
- /**
- *
- <dependencies>
- <dependency>
- <groupId>io.vertx</groupId>
- <artifactId>vertx-core</artifactId>
- <version>4.5.8</version>
- </dependency>
- <dependency>
- <groupId>io.vertx</groupId>
- <artifactId>vertx-pg-client</artifactId>
- <version>4.5.8</version>
- </dependency>
- </dependencies>
- */
- public class BulkInsertArgoProfiles extends AbstractVerticle {
- private static final Random random = new Random();
- private static final ThreadLocalRandom RAND = ThreadLocalRandom.current();
- // Target payload size (approx) for the "measurements" JSONB column
- private static final int TARGET_JSON_SIZE_BYTES = 28 * 1024; // ~28KB
- private static final int ROW_COUNT = 500;
- private PgPool client;
- public static void main(String[] args) {
- Vertx vertx = Vertx.vertx();
- vertx.deployVerticle(new BulkInsertArgoProfiles(), ar -> {
- if (ar.failed()) {
- System.err.println("Failed to deploy verticle: " + ar.cause());
- vertx.close();
- }
- });
- }
- @Override
- public void start(Promise<Void> startPromise) {
- // Configure Postgres connection
- PgConnectOptions connectOptions = new PgConnectOptions()
- .setPort(5432)
- .setHost("localhost")
- .setDatabase("postgres")
- .setUser("postgres")
- .setPassword("");
- // Configure connection pool
- PoolOptions poolOptions = new PoolOptions()
- .setMaxSize(10);
- client = PgPool.pool(vertx, connectOptions, poolOptions);
- // 1. Build all payloads first (synchronous)
- List<Tuple> batch = buildBatch();
- System.out.println("Finished building " + batch.size() + " payloads.");
- // 2. Time ONLY the insert
- long insertStart = System.currentTimeMillis();
- bulkInsertArgoProfiles(client, batch)
- .onSuccess(rows -> {
- long insertElapsed = System.currentTimeMillis() - insertStart;
- System.out.println("Successfully inserted " + rows + " rows into argo_profiles.");
- System.out.println("Insert elapsed ms (DB only): " + insertElapsed);
- startPromise.complete();
- vertx.close();
- })
- .onFailure(err -> {
- System.err.println("Bulk insert failed: " + err.getMessage());
- err.printStackTrace();
- startPromise.fail(err);
- vertx.close();
- });
- }
- /**
- * Build all tuples (including JSON payloads) for the batch insert.
- * This is done BEFORE we start timing the insert.
- */
- private List<Tuple> buildBatch() {
- List<Tuple> batch = new ArrayList<>(ROW_COUNT);
- for (int i = 0; i < ROW_COUNT; i++) {
- int floatId = random.nextInt(); // ensure (float_id, cycle) is unique enough for testing
- int cycle = 1;
- JsonObject measurements = buildMeasurementsJson(i, TARGET_JSON_SIZE_BYTES);
- // We send it as a String and cast to jsonb in SQL
- Tuple tuple = Tuple.of(
- floatId,
- cycle,
- measurements.encode() // JSON string
- );
- batch.add(tuple);
- }
- return batch;
- }
- /**
- * Perform the actual batch insert. Assumes the batch is already built.
- */
- private Future<Integer> bulkInsertArgoProfiles(SqlClient client, List<Tuple> batch) {
- String sql = """
- INSERT INTO argo_profiles (float_id, cycle, measurements)
- VALUES ($1, $2, $3::jsonb)
- ON CONFLICT (float_id, cycle) DO NOTHING
- """;
- return client
- .preparedQuery(sql)
- .executeBatch(batch)
- .map(rowSet -> {
- int count = rowSet.rowCount();
- System.out.println("Postgres reported rowCount = " + count);
- return count;
- });
- }
- /**
- * Build a JSON object whose nested structure is roughly targetSizeBytes when encoded.
- * We do this recursively: each level adds a chunk of random data plus some metadata.
- */
- private JsonObject buildMeasurementsJson(int index, int targetSizeBytes) {
- int maxDepth = 50; // recursion depth
- int bytesPerLevel = targetSizeBytes / maxDepth;
- JsonObject root = new JsonObject()
- .put("meta", new JsonObject()
- .put("createdAt", Instant.now().toString())
- .put("index", index)
- .put("description", "Synthetic profile measurement payload ~28KB"));
- // Nested recursive payload under "payload"
- JsonObject payload = buildRecursivePayload(0, maxDepth, bytesPerLevel, index);
- root.put("payload", payload);
- return root;
- }
- /**
- * Recursively build a nested JSON object.
- * Each level contributes a random string of approx bytesPerLevel bytes.
- */
- private JsonObject buildRecursivePayload(int level, int maxDepth, int bytesPerLevel, int index) {
- JsonObject obj = new JsonObject()
- .put("level", level)
- .put("seed", index * 31 + level)
- .put("chunk", generateRandomString(bytesPerLevel));
- // Add a bit more variation so each record is unique
- obj.put("randomValue", RAND.nextLong());
- // Recurse if we haven't hit max depth
- if (level + 1 < maxDepth) {
- obj.put("child", buildRecursivePayload(level + 1, maxDepth, bytesPerLevel, index));
- }
- return obj;
- }
- /**
- * Returns a random string of approximately sizeBytes characters.
- */
- private String generateRandomString(int sizeBytes) {
- char[] alphabet = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray();
- StringBuilder sb = new StringBuilder(sizeBytes);
- for (int i = 0; i < sizeBytes; i++) {
- sb.append(alphabet[RAND.nextInt(alphabet.length)]);
- }
- return sb.toString();
- }
- @Override
- public void stop() {
- if (client != null) {
- client.close();
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment