Guest User

Untitled

a guest
Nov 26th, 2025
27
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 6.69 KB | None | 0 0
  1. import io.vertx.core.AbstractVerticle;
  2. import io.vertx.core.Future;
  3. import io.vertx.core.Promise;
  4. import io.vertx.core.Vertx;
  5. import io.vertx.core.json.JsonObject;
  6. import io.vertx.pgclient.PgConnectOptions;
  7. import io.vertx.pgclient.PgPool;
  8. import io.vertx.sqlclient.PoolOptions;
  9. import io.vertx.sqlclient.SqlClient;
  10. import io.vertx.sqlclient.Tuple;
  11.  
  12. import java.time.Instant;
  13. import java.util.ArrayList;
  14. import java.util.List;
  15. import java.util.Random;
  16. import java.util.concurrent.ThreadLocalRandom;
  17.  
  18. /**
  19.  *
  20.  <dependencies>
  21.  <dependency>
  22.  <groupId>io.vertx</groupId>
  23.  <artifactId>vertx-core</artifactId>
  24.  <version>4.5.8</version>
  25.  </dependency>
  26.  <dependency>
  27.  <groupId>io.vertx</groupId>
  28.  <artifactId>vertx-pg-client</artifactId>
  29.  <version>4.5.8</version>
  30.  </dependency>
  31.  </dependencies>
  32.  */
  33. public class BulkInsertArgoProfiles extends AbstractVerticle {
  34.     private static final Random random = new Random();
  35.     private static final ThreadLocalRandom RAND = ThreadLocalRandom.current();
  36.  
  37.     // Target payload size (approx) for the "measurements" JSONB column
  38.     private static final int TARGET_JSON_SIZE_BYTES = 28 * 1024; // ~28KB
  39.     private static final int ROW_COUNT = 500;
  40.  
  41.     private PgPool client;
  42.  
  43.     public static void main(String[] args) {
  44.         Vertx vertx = Vertx.vertx();
  45.         vertx.deployVerticle(new BulkInsertArgoProfiles(), ar -> {
  46.             if (ar.failed()) {
  47.                 System.err.println("Failed to deploy verticle: " + ar.cause());
  48.                 vertx.close();
  49.             }
  50.         });
  51.     }
  52.  
  53.     @Override
  54.     public void start(Promise<Void> startPromise) {
  55.         // Configure Postgres connection
  56.         PgConnectOptions connectOptions = new PgConnectOptions()
  57.             .setPort(5432)
  58.             .setHost("localhost")
  59.             .setDatabase("postgres")
  60.             .setUser("postgres")
  61.             .setPassword("");
  62.  
  63.         // Configure connection pool
  64.         PoolOptions poolOptions = new PoolOptions()
  65.             .setMaxSize(10);
  66.  
  67.         client = PgPool.pool(vertx, connectOptions, poolOptions);
  68.  
  69.         // 1. Build all payloads first (synchronous)
  70.         List<Tuple> batch = buildBatch();
  71.         System.out.println("Finished building " + batch.size() + " payloads.");
  72.  
  73.         // 2. Time ONLY the insert
  74.         long insertStart = System.currentTimeMillis();
  75.  
  76.         bulkInsertArgoProfiles(client, batch)
  77.             .onSuccess(rows -> {
  78.                 long insertElapsed = System.currentTimeMillis() - insertStart;
  79.                 System.out.println("Successfully inserted " + rows + " rows into argo_profiles.");
  80.                 System.out.println("Insert elapsed ms (DB only): " + insertElapsed);
  81.                 startPromise.complete();
  82.                 vertx.close();
  83.             })
  84.             .onFailure(err -> {
  85.                 System.err.println("Bulk insert failed: " + err.getMessage());
  86.                 err.printStackTrace();
  87.                 startPromise.fail(err);
  88.                 vertx.close();
  89.             });
  90.     }
  91.  
  92.     /**
  93.      * Build all tuples (including JSON payloads) for the batch insert.
  94.      * This is done BEFORE we start timing the insert.
  95.      */
  96.     private List<Tuple> buildBatch() {
  97.         List<Tuple> batch = new ArrayList<>(ROW_COUNT);
  98.  
  99.         for (int i = 0; i < ROW_COUNT; i++) {
  100.             int floatId = random.nextInt();  // ensure (float_id, cycle) is unique enough for testing
  101.             int cycle = 1;
  102.  
  103.             JsonObject measurements = buildMeasurementsJson(i, TARGET_JSON_SIZE_BYTES);
  104.  
  105.             // We send it as a String and cast to jsonb in SQL
  106.             Tuple tuple = Tuple.of(
  107.                 floatId,
  108.                 cycle,
  109.                 measurements.encode() // JSON string
  110.             );
  111.             batch.add(tuple);
  112.         }
  113.  
  114.         return batch;
  115.     }
  116.  
  117.     /**
  118.      * Perform the actual batch insert. Assumes the batch is already built.
  119.      */
  120.     private Future<Integer> bulkInsertArgoProfiles(SqlClient client, List<Tuple> batch) {
  121.         String sql = """
  122.            INSERT INTO argo_profiles (float_id, cycle, measurements)
  123.            VALUES ($1, $2, $3::jsonb)
  124.            ON CONFLICT (float_id, cycle) DO NOTHING
  125.            """;
  126.  
  127.         return client
  128.             .preparedQuery(sql)
  129.             .executeBatch(batch)
  130.             .map(rowSet -> {
  131.                 int count = rowSet.rowCount();
  132.                 System.out.println("Postgres reported rowCount = " + count);
  133.                 return count;
  134.             });
  135.     }
  136.  
  137.     /**
  138.      * Build a JSON object whose nested structure is roughly targetSizeBytes when encoded.
  139.      * We do this recursively: each level adds a chunk of random data plus some metadata.
  140.      */
  141.     private JsonObject buildMeasurementsJson(int index, int targetSizeBytes) {
  142.         int maxDepth = 50; // recursion depth
  143.         int bytesPerLevel = targetSizeBytes / maxDepth;
  144.  
  145.         JsonObject root = new JsonObject()
  146.             .put("meta", new JsonObject()
  147.                 .put("createdAt", Instant.now().toString())
  148.                 .put("index", index)
  149.                 .put("description", "Synthetic profile measurement payload ~28KB"));
  150.  
  151.         // Nested recursive payload under "payload"
  152.         JsonObject payload = buildRecursivePayload(0, maxDepth, bytesPerLevel, index);
  153.         root.put("payload", payload);
  154.  
  155.         return root;
  156.     }
  157.  
  158.     /**
  159.      * Recursively build a nested JSON object.
  160.      * Each level contributes a random string of approx bytesPerLevel bytes.
  161.      */
  162.     private JsonObject buildRecursivePayload(int level, int maxDepth, int bytesPerLevel, int index) {
  163.         JsonObject obj = new JsonObject()
  164.             .put("level", level)
  165.             .put("seed", index * 31 + level)
  166.             .put("chunk", generateRandomString(bytesPerLevel));
  167.  
  168.         // Add a bit more variation so each record is unique
  169.         obj.put("randomValue", RAND.nextLong());
  170.  
  171.         // Recurse if we haven't hit max depth
  172.         if (level + 1 < maxDepth) {
  173.             obj.put("child", buildRecursivePayload(level + 1, maxDepth, bytesPerLevel, index));
  174.         }
  175.  
  176.         return obj;
  177.     }
  178.  
  179.     /**
  180.      * Returns a random string of approximately sizeBytes characters.
  181.      */
  182.     private String generateRandomString(int sizeBytes) {
  183.         char[] alphabet = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray();
  184.         StringBuilder sb = new StringBuilder(sizeBytes);
  185.         for (int i = 0; i < sizeBytes; i++) {
  186.             sb.append(alphabet[RAND.nextInt(alphabet.length)]);
  187.         }
  188.         return sb.toString();
  189.     }
  190.  
  191.     @Override
  192.     public void stop() {
  193.         if (client != null) {
  194.             client.close();
  195.         }
  196.     }
  197. }
  198.  
Advertisement
Add Comment
Please, Sign In to add comment