Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/perl
- use strict;
- use warnings;
- use Pod::Usage;
- use Time::HiRes qw/time sleep/;
- use POSIX;
- use List::Util qw/sum/;
- use Getopt::Long qw/:config no_ignore_case/;
- use DBI;
- use Data::Dumper;
- our $VERSION = '1.0.2';
- local $| = 1;
- select(STDOUT);
- our $_log = select();
- our $_log_level;
- our $_dbh;
- our $_after_round_statement;
- use constant MINIMAL_COMPACT_PAGES => 10;
- use constant MINIMAL_COMPACT_PERCENT => 20;
- use constant PAGES_PER_ROUND_DIVISOR => 1000;
- use constant MAX_PAGES_PER_ROUND => 5;
- use constant PAGES_BEFORE_VACUUM_LOWER_DIVISOR => 16;
- use constant PAGES_BEFORE_VACUUM_LOWER_THRESHOLD => 1000;
- use constant PAGES_BEFORE_VACUUM_UPPER_DIVISOR => 50;
- use constant PROGRESS_REPORT_PERIOD => 60;
- use constant LOCKED_ALTER_TIMEOUT => 1000;
- use constant LOCKED_ALTER_COUNT => 100;
- use constant LOG_ALWAYS => 1000;
- use constant LOG_ERROR => 2;
- use constant LOG_WARNING => 1;
- use constant LOG_NOTICE => 0;
- # Settings & defaults
- my $show_version;
- my $show_help;
- my $show_man;
- my $db_host = 'localhost';
- my $db_port = 5432;
- my $db_user = $ENV{LOGNAME} || $ENV{USER} || getpwuid($<);
- my $db_passwd = '';
- my $db_name = $ENV{LOGNAME} || $ENV{USER} || getpwuid($<);
- my $schema_name = 'public';
- my $table_name;
- my $verbose;
- my $quiet;
- my $force;
- my $delay_ratio = 0;
- my $after_round_query;
- my $routine_vacuum;
- my $no_reindex;
- my $print_reindex_queries = 0;
- my $max_retry_count = 10;
- my $all_db;
- my $only_schema;
- my $exclude_schema;
- my $exclude_table;
- my %only_schemas;
- my %excluded_schemas;
- my %excluded_tables;
- my $initial_reindex;
- my %table_info;
- unless (GetOptions(
- #help & man
- 'V|version' => \$show_version,
- 'help|?' => \$show_help,
- 'm|man' => \$show_man,
- #database connection parameters
- 'h|host=s' => \$db_host,
- 'p|port=i' => \$db_port,
- 'U|user=s' => \$db_user,
- 'W|password=s' => \$db_passwd,
- 'd|dbname=s' => \$db_name,
- 'n|schema=s' => \$only_schema,
- 't|table=s' => \$table_name,
- 'v|verbose' => \$verbose,
- 'q|quiet' => \$quiet,
- 'f|force' => \$force,
- 'r|no-reindex' => \$no_reindex,
- 's|print-reindex-queries' => \$print_reindex_queries,
- 'o|max-retry-count=i' => \$max_retry_count,
- 'E|delay-ratio=i' => \$delay_ratio,
- 'Q|after-round-query=s' => \$after_round_query,
- 'R|routine-vacuum' => \$routine_vacuum,
- 'a|all' => \$all_db,
- 'N|exclude-schema=s' => \$exclude_schema,
- 'T|exclude-table=s' => \$exclude_table,
- 'i|initial-reindex' => \$initial_reindex
- )) {
- show_usage();
- exit(0);
- }
- my $_verbosity = ($verbose ? LOG_NOTICE : ($quiet ? LOG_ERROR : LOG_WARNING));
- our $_current_db_name;
- our $_current_schema_name;
- our $_current_table_name;
- sub not_enough_arguments {
- logger(LOG_ALWAYS, "%s: At least one of the options must be specified: all, dbname", $0);
- logger('quit', "Use --help for short help, --man for full manual.");
- }
- sub no_together_arguments {
- logger(LOG_ALWAYS, "%s: These options cannot be specified together at once: %s", $0, join(', ', @_));
- logger('quit', "Use --help for short help, --man for full manual.");
- }
- sub show_version {
- logger(LOG_ALWAYS, "%s v%s", $0, $VERSION);
- }
- sub show_help {
- pod2usage(
- -message => "",
- -verbose => 99,
- -exitval => 1,
- -sections => 'NAME|SYNOPSIS|EXAMPLES',
- );
- }
- sub show_usage {
- pod2usage(
- -message => "",
- -verbose => 99,
- -exitval => 1,
- -sections => 'SYNOPSIS',
- );
- }
- sub show_man {
- pod2usage(
- -message => "",
- -verbose => 99,
- -exitval => 1,
- -sections => '|NAME|SYNOPSIS|DESCRIPTION|OPTIONS|LICENSE AND COPYRIGHT|VERSION|AUTHOR'
- );
- }
- sub set_current_db_name {
- $_current_db_name = shift;
- }
- sub set_current_schema_name_table_name {
- $_current_schema_name = shift;
- $_current_table_name = shift;
- }
- sub unset_current_db_name {
- undef $_current_db_name;
- }
- sub unset_current_schema_name_table_name {
- undef $_current_schema_name;
- undef $_current_table_name;
- }
- sub unset_after_round_statement {
- undef $_after_round_statement;
- }
- sub logger {
- my $level = shift;
- my $message = shift;
- my @message_args = @_;
- $level = LOG_WARNING unless ($level =~ /^\d+$/);
- return if ($level < $_verbosity);
- no strict;
- print $_log sprintf("%s%s$message\n", ($level == LOG_ALWAYS ? '' : sprintf("[%s] ", scalar(localtime))), ($_current_db_name ? sprintf("(%s%s) ", $_current_db_name, ($_current_schema_name && $_current_table_name) ? ":$_current_schema_name.$_current_table_name" : "") : ""), @message_args);
- use strict;
- }
- sub nice_size
- {
- my $size = shift;
- $size = 0 unless ($size);
- my @sizes = qw/B KB MB GB TB PB/;
- my $i = 0;
- while (abs($size) > 1024) {
- $size = $size / 1024;
- $i++;
- }
- return sprintf("%.3f$sizes[$i]", $size);
- }
- #DB procedures
- sub _dbh {
- unless ($_dbh && ref $_dbh && $_dbh->ping) {
- $_dbh = db_connect($_current_db_name, $db_host, $db_port, $db_user, $db_passwd);
- exit(0) unless ($_dbh && ref $_dbh && $_dbh->ping);
- }
- return $_dbh;
- }
- sub _after_round_statement {
- unless ($_after_round_statement) {
- if ($after_round_query) {
- $_after_round_statement = _dbh->prepare($after_round_query);
- if ($DBI::err) {
- logger(LOG_WARNING, "SQL Error in after round query %s: %s", $after_round_query, $DBI::errstr);
- unset $_after_round_statement;
- }
- }
- }
- return $_after_round_statement;
- }
- sub db_connect {
- my $db_name = shift;
- my $db_host = shift;
- my $db_port = shift;
- my $db_user = shift;
- my $db_password = shift;
- unset_after_round_statement;
- logger(LOG_WARNING, "Connecting to database");
- $_dbh = DBI->connect("DBI:Pg:dbname=$db_name;host=$db_host;port=$db_port", $db_user, $db_password,{RaiseError => 0, PrintError => 0, AutoCommit => 1});
- if($DBI::err) {
- logger(LOG_ERROR, "Cannot connect DBI:Pg:dbname=%s;host=%s;port=%s user=%s,passwd=...: %s", $db_name, $db_host, $db_port, $db_user, $DBI::errstr);
- return undef;
- }
- $_dbh->do("set client_min_messages to warning;");
- return $_dbh;
- }
- sub db_disconnect {
- my $db_name = shift;
- logger(LOG_WARNING, "Disconnecting from database");
- _dbh->disconnect;
- }
- sub get_databases {
- my $sth = _dbh->prepare("
- SELECT datname FROM pg_catalog.pg_database
- WHERE
- datname NOT IN ('template0')
- ORDER BY pg_catalog.pg_database_size(datname), datname
- ");
- $sth->execute;
- if ($DBI::err) {
- logger(LOG_ERROR, "SQL Error: %s", $DBI::errstr);
- return undef;
- }
- my @result;
- while(my ($db_name) = $sth->fetchrow_array) {
- push @result, $db_name;
- }
- return \@result || [];
- }
- sub get_database_tables {
- my $sth = _dbh->prepare("
- SELECT schemaname, tablename FROM pg_catalog.pg_tables
- WHERE
- NOT (schemaname = 'pg_catalog' AND tablename = 'pg_index') AND
- schemaname !~ 'pg_(temp|toast|catalog).*' AND
- NOT schemaname = 'information_schema'
- ORDER BY
- pg_catalog.pg_relation_size(
- quote_ident(schemaname) || '.' || quote_ident(tablename)),
- schemaname, tablename
- ");
- if ($DBI::err) {
- logger(LOG_ERROR, "SQL Error: %s", $DBI::errstr);
- return undef;
- }
- $sth->execute;
- my @result;
- while(my $ident = $sth->fetchrow_hashref) {
- push @result, $ident;
- }
- return \@result || [];
- }
- sub get_pgstattuple_schema_name {
- my $sth = _dbh->prepare("
- SELECT nspname FROM pg_catalog.pg_proc
- JOIN pg_catalog.pg_namespace AS n ON pronamespace = n.oid
- WHERE proname = 'pgstattuple' LIMIT 1
- ");
- $sth->execute;
- if ($DBI::err) {
- logger(LOG_ERROR, "SQL Error: %s", $DBI::errstr);
- return undef;
- }
- my ($pgstattuple_schema_name) = $sth->fetchrow_array;
- return $pgstattuple_schema_name;
- }
- sub get_size_stats {
- my $schema_name = shift;
- my $table_name = shift;
- my $sth = _dbh->prepare("
- SELECT
- size,
- total_size,
- ceil(size / bs) AS page_count,
- ceil(total_size / bs) AS total_page_count
- FROM (
- SELECT
- current_setting('block_size')::integer AS bs,
- pg_catalog.pg_relation_size(quote_ident(?)||'.'||quote_ident(?)) AS size,
- pg_catalog.pg_total_relation_size(quote_ident(?)||'.'||quote_ident(?)) AS total_size
- ) AS sq
- ");
- $sth->execute($schema_name, $table_name, $schema_name, $table_name);
- if ($DBI::err) {
- logger(LOG_ERROR, "SQL Error: %s", $DBI::errstr);
- return undef;
- }
- my $result = $sth->fetchrow_hashref;
- if (! $result || ref $result ne 'HASH') {
- logger(LOG_ERROR,"Cannot get size statistics");
- }
- return $result;
- }
- sub get_bloat_stats {
- my $schema_name = shift;
- my $table_name = shift;
- my $ident_name = $schema_name.".".$table_name;
- my $pgstattuple_schema_name = get_pgstattuple_schema_name;
- return undef unless($pgstattuple_schema_name);
- my $sth = _dbh->prepare("SELECT
- ceil((size - free_space - dead_tuple_len) * 100 / fillfactor / bs) AS effective_page_count,
- greatest(round(
- (100 * (1 - (100 - free_percent - dead_tuple_percent) / fillfactor))::numeric, 2
- ),0) AS free_percent,
- greatest(ceil(size - (size - free_space - dead_tuple_len) * 100 / fillfactor), 0) AS free_space
- FROM (
- SELECT
- current_setting('block_size')::integer AS bs,
- pg_catalog.pg_relation_size(pg_catalog.pg_class.oid) AS size,
- coalesce(
- (
- SELECT (
- regexp_matches(
- reloptions::text, E'.*fillfactor=(\\\\d+).*'))[1]),
- '100')::real AS fillfactor,
- pgst.*
- FROM pg_catalog.pg_class
- CROSS JOIN
- " . _dbh->quote_identifier($pgstattuple_schema_name) . ".pgstattuple(
- (quote_ident(?) || '.' || quote_ident(?))) AS pgst
- WHERE pg_catalog.pg_class.oid = (quote_ident(?) || '.' || quote_ident(?))::regclass
- ) AS sq");
- $sth->execute($schema_name, $table_name, $schema_name, $table_name);
- if ($DBI::err) {
- logger(LOG_ERROR, "SQL Error: %s", $DBI::errstr);
- return undef;
- }
- my $result = $sth->fetchrow_hashref;
- return $result;
- }
- sub get_update_column {
- my $schema_name = shift;
- my $table_name = shift;
- my $sth = _dbh->prepare("SELECT quote_ident(attname)
- FROM pg_catalog.pg_attribute
- WHERE
- attnum > 0 AND -- neither system
- NOT attisdropped AND -- nor dropped
- attrelid = (quote_ident(?) || '.' || quote_ident(?))::regclass
- ORDER BY
- -- Variable legth attributes have lower priority because of the chance
- -- of being toasted
- (attlen = -1),
- -- Preferably not indexed attributes
- (
- attnum::text IN (
- SELECT regexp_split_to_table(indkey::text, ' ')
- FROM pg_catalog.pg_index
- WHERE indrelid = (quote_ident(?) || '.' || quote_ident(?))::regclass)),
- -- Preferably smaller attributes
- attlen,
- attnum
- LIMIT 1;");
- $sth->execute($schema_name, $table_name, $schema_name, $table_name);
- if ($DBI::err) {
- logger(LOG_ERROR, "SQL Error: %s", $DBI::errstr);
- return undef;
- }
- my ($result) = $sth->fetchrow_array;
- return $result;
- }
- sub get_pages_per_round {
- my $page_count = shift;
- my $to_page = shift;
- my $real_pages_per_round = $page_count / PAGES_PER_ROUND_DIVISOR > 1 ? $page_count / PAGES_PER_ROUND_DIVISOR : 1;
- my $pages_per_round = $real_pages_per_round < MAX_PAGES_PER_ROUND ? $real_pages_per_round : MAX_PAGES_PER_ROUND;
- my $result = ceil($pages_per_round) < $to_page ? ceil($pages_per_round) : $to_page;
- return $result;
- }
- sub get_pages_before_vacuum {
- my $page_count = shift;
- my $expected_page_count = shift;
- my $pages = $page_count / PAGES_BEFORE_VACUUM_LOWER_DIVISOR < PAGES_BEFORE_VACUUM_LOWER_THRESHOLD ? $page_count / PAGES_BEFORE_VACUUM_LOWER_DIVISOR : $page_count / PAGES_BEFORE_VACUUM_LOWER_THRESHOLD;
- my $result = $pages > $expected_page_count / PAGES_BEFORE_VACUUM_UPPER_DIVISOR ? $pages : $expected_page_count / PAGES_BEFORE_VACUUM_UPPER_DIVISOR;
- return ceil($result);
- }
- sub get_max_tupples_per_page {
- my $schema_name = shift;
- my $table_name = shift;
- my $ident_name = _dbh->quote_identifier($schema_name) . "." . _dbh->quote_identifier($table_name);
- my $sth = _dbh->prepare("
- SELECT ceil(current_setting('block_size')::real / sum(attlen))
- FROM pg_catalog.pg_attribute
- WHERE
- attrelid = '$ident_name'::regclass AND
- attnum < 0;
- ");
- $sth->execute;
- if ($DBI::err) {
- logger(LOG_ERROR, "SQL Error: %s", $DBI::errstr);
- return undef;
- }
- my ($result) = $sth->fetchrow_array;
- logger(LOG_ERROR, 'Can not get max tupples per page.') unless(defined $result);
- return $result;
- }
- sub has_triggers {
- my $schema_name = shift;
- my $table_name = shift;
- my $ident_name = _dbh->quote_identifier($schema_name) . "." . _dbh->quote_identifier($table_name);
- my $sth = _dbh->prepare("SELECT count(1) FROM pg_catalog.pg_trigger
- WHERE
- tgrelid = '$ident_name'::regclass AND
- tgenabled IN ('A', 'R') AND
- (tgtype & 16)::boolean");
- $sth->execute;
- if ($DBI::err) {
- logger(LOG_ERROR, "SQL Error: %s", $DBI::errstr);
- return undef;
- }
- my ($result) = $sth->fetchrow_array;
- return $result;
- }
- sub try_advisory_lock {
- my $schema_name = shift;
- my $table_name = shift;
- my $sth = _dbh->prepare("
- SELECT pg_try_advisory_lock(
- 'pg_catalog.pg_class'::regclass::integer,
- (quote_ident(?)||'.'||quote_ident(?))::regclass::integer)::integer;
- ");
- $sth->execute($schema_name, $table_name);
- if ($DBI::err) {
- logger(LOG_ERROR, "SQL Error: %s", $DBI::errstr);
- return undef;
- }
- my ($lock) = $sth->fetchrow_array;
- logger(LOG_NOTICE, "Skipping processing: another instance is working with table %s.%s", $schema_name, $table_name) unless ($lock);
- return $lock;
- }
- sub vacuum {
- my $schema_name = shift;
- my $table_name = shift;
- my $analyze = shift;
- my $sth = _dbh->do('VACUUM '.($analyze ? 'ANALYZE ' : ''). _dbh->quote_identifier($schema_name) . "." . _dbh->quote_identifier($table_name));
- if ($DBI::err) {
- logger(LOG_ERROR, "SQL Error: %s", $DBI::errstr);
- return undef;
- }
- return;
- }
- sub analyze {
- my $schema_name = shift;
- my $table_name = shift;
- my $sth = $_dbh->do("ANALYZE "._dbh->quote_identifier($schema_name) . "." . _dbh->quote_identifier($table_name));
- if ($DBI::err) {
- logger(LOG_ERROR, "SQL Error: %s", $DBI::errstr);
- return undef;
- }
- return;
- }
- sub set_session_replication_role {
- my $sth = $_dbh->do('set session_replication_role to replica;');
- if ($DBI::err) {
- logger(LOG_ERROR, "SQL Error: %s", $DBI::errstr);
- return undef;
- }
- return;
- }
- sub create_clean_pages_function {
- _dbh->do("
- CREATE OR REPLACE FUNCTION public.pgcompact_clean_pages_$$(
- i_table_ident text,
- i_column_ident text,
- i_to_page integer,
- i_page_offset integer,
- i_max_tupples_per_page integer)
- RETURNS integer
- LANGUAGE plpgsql AS \$\$
- DECLARE
- _from_page integer := i_to_page - i_page_offset + 1;
- _min_ctid tid;
- _max_ctid tid;
- _ctid_list tid[];
- _next_ctid_list tid[];
- _ctid tid;
- _loop integer;
- _result_page integer;
- _update_query text :=
- 'UPDATE ONLY ' || i_table_ident ||
- ' SET ' || i_column_ident || ' = ' || i_column_ident ||
- ' WHERE ctid = ANY(\$1) RETURNING ctid';
- BEGIN
- -- Check page argument values
- IF NOT (
- i_page_offset IS NOT NULL AND i_page_offset >= 1 AND
- i_to_page IS NOT NULL AND i_to_page >= 1 AND
- i_to_page >= i_page_offset)
- THEN
- RAISE EXCEPTION 'Wrong page arguments specified.';
- END IF;
- -- Check that session_replication_role is set to replica to
- -- prevent triggers firing
- IF NOT (
- SELECT setting = 'replica'
- FROM pg_catalog.pg_settings
- WHERE name = 'session_replication_role')
- THEN
- RAISE EXCEPTION 'The session_replication_role must be set to replica.';
- END IF;
- -- Define minimal and maximal ctid values of the range
- _min_ctid := (_from_page, 1)::text::tid;
- _max_ctid := (i_to_page, i_max_tupples_per_page)::text::tid;
- -- Build a list of possible ctid values of the range
- SELECT array_agg((pi, ti)::text::tid)
- INTO _ctid_list
- FROM generate_series(_from_page, i_to_page) AS pi
- CROSS JOIN generate_series(1, i_max_tupples_per_page) AS ti;
- <<_outer_loop>>
- FOR _loop IN 1..i_max_tupples_per_page LOOP
- _next_ctid_list := array[]::tid[];
- -- Update all the tuples in the range
- FOR _ctid IN EXECUTE _update_query USING _ctid_list
- LOOP
- IF _ctid > _max_ctid THEN
- _result_page := -1;
- EXIT _outer_loop;
- ELSIF _ctid >= _min_ctid THEN
- -- The tuple is still in the range, more updates are needed
- _next_ctid_list := _next_ctid_list || _ctid;
- END IF;
- END LOOP;
- _ctid_list := _next_ctid_list;
- -- Finish processing if there are no tupples in the range left
- IF coalesce(array_length(_ctid_list, 1), 0) = 0 THEN
- _result_page := _from_page - 1;
- EXIT _outer_loop;
- END IF;
- END LOOP;
- -- No result
- IF _loop = i_max_tupples_per_page AND _result_page IS NULL THEN
- RAISE EXCEPTION
- 'Maximal loops count has been reached with no result.';
- END IF;
- RETURN _result_page;
- END \$\$;
- ");
- if ($DBI::err) {
- logger(LOG_ERROR, "SQL Error: %s", $DBI::errstr);
- return undef;
- }
- return 1;
- }
- sub drop_clean_pages_function {
- _dbh->do("
- DROP FUNCTION public.pgcompact_clean_pages_$$(text, text,integer, integer, integer);
- ");
- if ($DBI::err) {
- logger(LOG_ERROR, "SQL Error: %s", $DBI::errstr);
- return undef;
- }
- return ;
- }
- sub clean_pages {
- my $schema_name = shift;
- my $table_name = shift;
- my $column_name = shift;
- my $to_page = shift;
- my $pages_per_round = shift;
- my $max_tupples_per_page = shift;
- my $ident_name = _dbh->quote_identifier($schema_name) . "." . _dbh->quote_identifier($table_name);
- my $sth = _dbh->prepare("
- SELECT public.pgcompact_clean_pages_$$(?,?,?,?,?)
- ");
- $sth->execute($ident_name, $column_name, $to_page, $pages_per_round, $max_tupples_per_page);
- if ($DBI::err) {
- logger(LOG_ERROR, "SQL Error: %s", $DBI::errstr);
- return undef;
- }
- my ($result) = $sth->fetchrow_array;
- return $result;
- }
- sub get_index_data_list {
- my $schema_name = shift;
- my $table_name = shift;
- my $sth = _dbh->prepare("
- SELECT
- indexname, tablespace, indexdef,
- regexp_replace(indexdef, E'.* USING (\\\\w+) .*', E'\\\\1') AS indmethod,
- conname,
- CASE
- WHEN contype = 'p' THEN 'PRIMARY KEY'
- WHEN contype = 'u' THEN 'UNIQUE'
- ELSE NULL END AS contypedef,
- (
- SELECT
- bool_and(
- deptype IN ('n', 'a', 'i') AND
- NOT (refobjid = indexoid AND deptype = 'n') AND
- NOT (
- objid = indexoid AND deptype = 'i' AND
- (version < array[9,1] OR contype NOT IN ('p', 'u'))))
- FROM pg_catalog.pg_depend
- LEFT JOIN pg_catalog.pg_constraint ON
- pg_catalog.pg_constraint.oid = refobjid
- WHERE
- (objid = indexoid AND classid = pgclassid) OR
- (refobjid = indexoid AND refclassid = pgclassid)
- )::integer AS allowed,
- pg_catalog.pg_relation_size(indexoid)
- FROM (
- SELECT
- indexname, tablespace, indexdef,
- (
- quote_ident(schemaname) || '.' ||
- quote_ident(indexname))::regclass AS indexoid,
- 'pg_catalog.pg_class'::regclass AS pgclassid,
- string_to_array(
- regexp_replace(
- version(), E'.*PostgreSQL (\\\\d+\\\\.\\\\d+).*', E'\\\\1'),
- '.')::integer[] AS version
- FROM pg_catalog.pg_indexes
- WHERE
- schemaname = quote_ident(?) AND
- tablename = quote_ident(?)
- ) AS sq
- LEFT JOIN pg_catalog.pg_constraint ON
- conindid = indexoid AND contype IN ('p', 'u')
- ORDER BY 8
- ");
- $sth->execute($schema_name, $table_name);
- if ($DBI::err) {
- logger(LOG_ERROR, "SQL Error: %s", $DBI::errstr);
- return undef;
- }
- my @result;
- while(my $result = $sth->fetchrow_hashref) {
- push @result, $result;
- }
- return \@result;
- }
- sub get_index_size_statistics {
- my $schema_name = shift;
- my $index_name = shift;
- my $sth = _dbh->prepare("
- SELECT size, ceil(size / bs) AS page_count
- FROM (
- SELECT
- pg_catalog.pg_relation_size((quote_ident(?) || '.' || quote_ident(?))::regclass) AS size,
- current_setting('block_size')::real AS bs
- ) AS sq
- ");
- $sth->execute($schema_name, $index_name);
- if ($DBI::err) {
- logger(LOG_ERROR, "SQL Error: %s", $DBI::errstr);
- return undef;
- }
- my $result = $sth->fetchrow_hashref;
- return ($result && ref $result eq 'HASH' && defined $result->{size} && defined $result->{page_count} ? $result : undef);
- }
- sub get_reindex_query {
- my $index_data = shift;
- my $sql = $index_data->{indexdef};
- $sql =~ s/INDEX (\S+)/INDEX CONCURRENTLY pgcompact_index_$$/;
- $sql =~ s/( WHERE .*|$)/ TABLESPACE $index_data->{tablespace}$1/ if (defined $index_data->{tablespace});
- return $sql;
- }
- sub get_alter_index_query {
- my $schema_name = shift;
- my $table_name = shift;
- my $index_data = shift;
- my $constraint_ident = _dbh->quote_identifier($index_data->{conname}) if ($index_data && ref $index_data eq 'HASH' && $index_data->{conname});
- if($constraint_ident) {
- return
- "BEGIN; SET LOCAL statement_timeout TO " . LOCKED_ALTER_TIMEOUT . ";
- ALTER TABLE " . _dbh->quote_identifier($schema_name) . "." . _dbh->quote_identifier($table_name) . " DROP CONSTRAINT $constraint_ident;
- ALTER TABLE " . _dbh->quote_identifier($schema_name) . "." . _dbh->quote_identifier($table_name) . " ADD CONSTRAINT $constraint_ident $index_data->{contypedef} USING INDEX pgcompact_index_$$;
- END;";
- } else {
- my $tmp_index_name = "tmp_".int(rand(1000000000));
- return
- "BEGIN;" . "
- ALTER INDEX " . _dbh->quote_identifier($schema_name) . "." . _dbh->quote_identifier($index_data->{indexname}) . " RENAME TO " . _dbh->quote_identifier($tmp_index_name) . ";
- ALTER INDEX " . _dbh->quote_identifier($schema_name) . ".pgcompact_index_$$ RENAME TO " . _dbh->quote_identifier($index_data->{indexname}) . ";
- END;
- DROP INDEX CONCURRENTLY " . _dbh->quote_identifier($schema_name) . "." . _dbh->quote_identifier($tmp_index_name) . ";";
- }
- }
- sub get_straight_reindex_query {
- my $schema_name = shift;
- my $table_name = shift;
- my $index_data = shift;
- return "REINDEX INDEX ('" . _dbh->quote_identifier($schema_name) . "." . _dbh->quote_identifier($index_data->{indexname})."')";
- }
- sub get_index_bloat_stats {
- my $schema_name = shift;
- my $index_name = shift;
- my $pgstattuple_schema_name = get_pgstattuple_schema_name;
- return undef unless($pgstattuple_schema_name);
- my $sth = _dbh->prepare("
- SELECT
- CASE
- WHEN avg_leaf_density = 'NaN' THEN 0
- ELSE
- round(
- (100 * (1 - avg_leaf_density / fillfactor))::numeric, 2
- )
- END AS free_percent,
- CASE
- WHEN avg_leaf_density = 'NaN' THEN 0
- ELSE
- ceil(
- index_size * (1 - avg_leaf_density / fillfactor)
- )
- END AS free_space
- FROM (
- SELECT
- coalesce(
- (
- SELECT (
- regexp_matches(
- reloptions::text, E'.*fillfactor=(\\\\d+).*'))[1]),
- '90')::real AS fillfactor,
- pgsi.*
- FROM pg_catalog.pg_class
- CROSS JOIN $pgstattuple_schema_name.pgstatindex(
- quote_ident(?) || '.' || quote_ident(?)) AS pgsi
- WHERE pg_catalog.pg_class.oid = (quote_ident(?) || '.' || quote_ident(?))::regclass
- ) AS oq
- ");
- $sth->execute($schema_name, $index_name, $schema_name, $index_name);
- if ($DBI::err) {
- logger(LOG_ERROR, "SQL Error: %s", $DBI::errstr);
- return undef;
- }
- my $result = $sth->fetchrow_hashref;
- return ($result && ref $result eq 'HASH' && $result->{'free_percent'} && $result->{'free_space'}) ? $result : undef;
- }
- sub reindex {
- my $index_data = shift;
- _dbh->do(get_reindex_query($index_data));
- if ($DBI::err) {
- logger(LOG_ERROR, "SQL Error: %s", $DBI::errstr);
- return undef;
- }
- return;
- }
- sub alter_index {
- my $schema_name = shift;
- my $table_name = shift;
- my $index_data = shift;
- foreach my $sql (split(/;/, get_alter_index_query($schema_name, $table_name, $index_data))) {
- next if ($sql =~ /^\s*$/);
- _dbh->do("$sql;");
- if ($DBI::err) {
- logger(LOG_ERROR, "SQL Error: %s", $DBI::errstr);
- _dbh->do("END;");
- return undef;
- }
- }
- }
- sub drop_temp_index {
- my $schema_name = shift;
- _dbh->do("SET LOCAL statement_timeout TO " . LOCKED_ALTER_TIMEOUT . ";");
- _dbh->do("DROP INDEX CONCURRENTLY ?;", undef, "$schema_name.pgcompact_index_$$");
- if ($DBI::err) {
- logger(LOG_ERROR, "SQL Error: %s", $DBI::errstr);
- return undef;
- }
- return;
- }
- sub get_pg_backend_pid {
- my ($backend_pid) = _dbh->selectrow_array("select pg_backend_pid();");
- return $backend_pid;
- }
- sub reindex_table {
- my $table_name = shift;
- my $schema_name = shift;
- my $db_name = shift;
- my $print_reindex_queries = shift;
- my $is_reindexed;
- my $index_data_list = get_index_data_list($schema_name, $table_name) || [];
- if ($DBI::err) {
- logger(LOG_ERROR, "Table handling interrupt.");
- return -1;
- }
- for my $index_data (@$index_data_list) {
- my $initial_index_size_stats = get_index_size_statistics($schema_name, $index_data->{indexname});
- if (!$initial_index_size_stats || ref $initial_index_size_stats ne 'HASH') {
- logger(LOG_ERROR, "Cannot get index size statistics.");
- return;
- }
- if ($initial_index_size_stats->{page_count} <= 1) {
- logger(LOG_NOTICE, "Skipping reindex: %s.%s, empty or 1 page index.", $schema_name, $index_data->{indexname});
- next;
- }
- my $index_bloat_stats;
- if (! $force) {
- if ($index_data->{indmethod} ne 'btree') {
- logger(LOG_NOTICE, "Skipping reindex: %s.%s is a %s index not a btree, reindexing is up to you.", $schema_name, $index_data->{indexname}, $index_data->{indmethod});
- logger(LOG_WARNING, "Reindex queries: %s.%s, initial size %d pages (%s)", $schema_name, $index_data->{indexname}, $initial_index_size_stats->{page_count}, nice_size($initial_index_size_stats->{size}));
- if ($index_data->{data}{allowed}) {
- logger(LOG_WARNING, "%s; --%s", get_reindex_query($index_data), $db_name);
- logger(LOG_WARNING, "%s; --%s", get_alter_index_query($schema_name, $table_name, $index_data), $db_name);
- } else {
- logger(LOG_WARNING, "%s; --%s", get_straight_reindex_query($schema_name, $table_name, $index_data), $db_name);
- }
- next;
- }
- if ($initial_index_size_stats->{page_count} < MINIMAL_COMPACT_PAGES) {
- logger(LOG_NOTICE, "Skipping reindex: %s.%s, %d pages from %d pages minimum required.",$schema_name, $index_data->{indexname}, $initial_index_size_stats->{page_count}, MINIMAL_COMPACT_PAGES);
- next;
- }
- $index_bloat_stats = get_index_bloat_stats($schema_name, $index_data->{indexname});
- if ($index_bloat_stats && ref $index_bloat_stats eq 'HASH' && $index_bloat_stats->{'free_percent'} < MINIMAL_COMPACT_PERCENT) {
- logger(LOG_NOTICE, "Skipping reindex: %s.%s, %d%% space to compact from %d%% minimum required.", $schema_name, $index_data->{indexname}, $index_bloat_stats->{free_percent}, MINIMAL_COMPACT_PERCENT);
- next;
- }
- }
- if (not $index_data->{'allowed'}) {
- logger(LOG_NOTICE, "Skipping reindex: %s.%s, can not reindex without heavy locks because of its dependencies, reindexing is up to you.", $schema_name, $index_data->{indexname});
- logger(LOG_WARNING, "Reindex queries%s: %s.%s, initial size %d pages (%s), will be reduced by %d%% (%s)",
- ($force ? ' forced' : ''),
- $schema_name,
- $index_data->{indexname},
- $initial_index_size_stats->{page_count},
- nice_size($initial_index_size_stats->{size}),
- $index_bloat_stats->{free_percent},
- nice_size($index_bloat_stats->{free_space})
- );
- logger(LOG_WARNING, "%s; --%s", get_reindex_query($index_data), $db_name);
- logger(LOG_WARNING, "%s; --%s", get_alter_index_query($schema_name, $table_name, $index_data), $db_name);
- next;
- }
- if (!$no_reindex) {
- my $start_reindex_time = time;
- reindex($index_data);
- if($DBI::err) {
- logger(LOG_NOTICE, "Skipping index %s: %s", $index_data->{indexname});
- next;
- }
- my $reindex_time = time - $start_reindex_time;
- my $locked_alter_attempt = 0;
- while ($locked_alter_attempt < LOCKED_ALTER_COUNT) {
- unless(defined(alter_index($schema_name, $table_name, $index_data))) {
- if ($DBI::errstr && $DBI::errstr =~ 'canceling statement due to statement timeout') {
- $locked_alter_attempt++;
- next;
- } else {
- logger(LOG_ERROR, $@);
- return;
- }
- };
- $reindex_time = time - $start_reindex_time;
- last;
- }
- if ($locked_alter_attempt < LOCKED_ALTER_COUNT) {
- my $new_stats = get_index_size_statistics($schema_name, $index_data->{indexname});
- my $free_percent = 100 * (1 - $new_stats->{size} / $initial_index_size_stats->{size});
- my $free_space = ($initial_index_size_stats->{size} - $new_stats->{size});
- logger(LOG_WARNING, "Reindex%s: %s.%s, initial size %d pages(%s), has been reduced by %d%% (%s), duration %d seconds, attempts %d.",
- ($force ? " forced" : ""),
- $schema_name,
- $index_data->{indexname},
- $initial_index_size_stats->{page_count},
- nice_size($initial_index_size_stats->{size}),
- int($free_percent),
- nice_size($free_space),
- $reindex_time,
- $locked_alter_attempt
- );
- $is_reindexed = (defined $is_reindexed) ? ($is_reindexed and 1) : 1;
- } else {
- my $drop_index_time = time;
- drop_temp_index($schema_name);
- $reindex_time += time - $drop_index_time;
- logger(LOG_WARNING, "Reindex%s: %s.%s, lock has not been acquired, initial size %d pages(%d)",#, can be reduced by %d%% (%s), duration %d seconds.",
- ($force ? " forced" : ""),
- $schema_name,
- $index_data->{indexname},
- $initial_index_size_stats->{page_count},
- nice_size($initial_index_size_stats->{size}),
- #$bloat_stats->{free_percent},
- #nice_size($bloat_stats->{free_space}),
- #$reindex_time
- );
- $is_reindexed = 0;
- }
- }
- if ($print_reindex_queries) {
- logger(LOG_WARNING, "Reindex queries%s: %s.%s, initial size %d pages (%s), will be reduced by %d%% (%s)",
- ($force ? ' forced' : ''),
- $schema_name,
- $index_data->{indexname},
- $initial_index_size_stats->{page_count},
- nice_size($initial_index_size_stats->{size}),
- ($index_bloat_stats->{free_percent}||0),
- nice_size($index_bloat_stats->{free_space})
- );
- logger(LOG_WARNING, "%s; --%s", get_reindex_query($index_data), $db_name);
- logger(LOG_WARNING, "%s; --%s", get_alter_index_query($schema_name, $table_name, $index_data), $db_name);
- }
- }
- return $is_reindexed;
- }
- #Process function
- sub process {
- my $schema_name = shift;
- my $table_name = shift;
- my $attempt = shift;
- my $table_info = shift;
- my $is_skipped;
- my $is_locked = try_advisory_lock($schema_name, $table_name) ? 0 : 1;
- if ($DBI::err) {
- logger(LOG_ERROR, "Table handling interrupt.");
- return -1;
- }
- $table_info->{stats} = get_size_stats($schema_name, $table_name);
- if ($DBI::err) {
- logger(LOG_ERROR, "Table handling interrupt.");
- return -1;
- }
- $table_info->{base_stats} = {%{$table_info->{stats}}} unless ($table_info->{base_stats});
- if (!$is_locked) {
- my $vacuum_time = time;
- vacuum($schema_name, $table_name);
- if ($DBI::err) {
- logger(LOG_ERROR, "Table handling interrupt.");
- return -1;
- }
- $vacuum_time = time - $vacuum_time;
- $table_info->{stats} = get_size_stats($schema_name, $table_name);
- if ($DBI::err) {
- logger(LOG_ERROR, "Table handling interrupt.");
- return -1;
- }
- logger(LOG_NOTICE, "Vacuum initial: %d pages left, duration %.3f seconds.", ($table_info->{stats}{page_count}||0), $vacuum_time);
- if($table_info->{stats}{page_count} <= 1) {
- logger(LOG_NOTICE, "Skipping processing: empty or 1 page table.");
- $is_skipped = 1;
- }
- }
- my $bloat_stats;
- my $is_reindexed;
- if (!$is_locked && !$is_skipped) {
- if ($initial_reindex && !$no_reindex && defined($attempt) && $attempt == 0) {
- $is_reindexed = reindex_table($table_name, $schema_name, $db_name);
- }
- my $get_stat_time = time;
- $bloat_stats = get_bloat_stats($schema_name, $table_name);
- if ($DBI::err) {
- logger(LOG_ERROR, "Table handling interrupt.");
- return -1;
- }
- $get_stat_time = time - $get_stat_time;
- if ($bloat_stats->{effective_page_count}) {
- logger(LOG_NOTICE,"Bloat statistics with pgstattuple: duration %.3f seconds.", $get_stat_time);
- } else {
- my $analyze_time = time;
- analyze($schema_name, $table_name);
- if ($DBI::err) {
- logger(LOG_ERROR, "Table handling interrupt.");
- return -1;
- }
- $analyze_time = time - $analyze_time;
- logger(LOG_WARNING, "Analyze required initial: duration %.3f second.", $analyze_time);
- $get_stat_time = time;
- $bloat_stats = get_bloat_stats($schema_name, $table_name);
- if ($DBI::err) {
- logger(LOG_ERROR, "Table handling interrupt.");
- return -1;
- }
- $get_stat_time = time - $get_stat_time;
- if ($bloat_stats->{effective_page_count}) {
- logger(LOG_NOTICE, "Bloat statistics with pgstattuple: duration %.3f seconds.", $get_stat_time)
- } else {
- logger('qiuet', "Can not get bloat statistics, processing stopped.");
- $is_skipped = 1;
- }
- }
- }
- if (!$is_locked && !$is_skipped) {
- my $can_be_compacted = ($bloat_stats->{'free_percent'} > 0 && ($table_info->{stats}{page_count} > $bloat_stats->{effective_page_count}));
- if ($can_be_compacted) {
- logger(LOG_WARNING, "Statistics: %d pages (%d pages including toasts and indexes) , approximately %0.3f%% (%d pages) can be compacted reducing the size by %s.",
- $table_info->{stats}{page_count},
- $table_info->{stats}{total_page_count},
- $bloat_stats->{'free_percent'},
- ($table_info->{stats}{page_count} - $bloat_stats->{effective_page_count}),
- nice_size($bloat_stats->{free_space})
- );
- } else {
- logger(LOG_WARNING, "Statistics: %d pages (%d pages including toasts and indexes)",
- $table_info->{stats}{page_count},
- $table_info->{stats}{total_page_count}
- );
- }
- if(has_triggers($schema_name, $table_name)) {
- if ($DBI::err) {
- logger(LOG_ERROR, "Table handling interrupt.");
- return -1;
- }
- logger(LOG_ERROR,'Can not process: "always" or "replica" triggers are on.');
- $is_skipped = 1;
- }
- if(!$force) {
- if($table_info->{stats}{page_count} < MINIMAL_COMPACT_PAGES) {
- logger(LOG_NOTICE,"Skipping processing: %d pages from %d pages minimum required.'", $table_info->{stats}{page_count}, MINIMAL_COMPACT_PAGES);
- $is_skipped = 1;
- }
- if(($bloat_stats->{free_percent}||0) <= MINIMAL_COMPACT_PERCENT) {
- logger(LOG_NOTICE,"Skipping processing: %0.2f%% space to compact from 20%% minimum required.",($bloat_stats->{free_percent}||0));
- $is_skipped = 1;
- }
- }
- }
- if (!$is_locked && !$is_skipped) {
- logger(LOG_WARNING, "Processing forced.") if ($force);
- my $vacuum_page_count = 0;
- my $initial_size_stats = {%{$table_info->{stats}}};
- my $to_page = $table_info->{stats}{page_count} - 1;
- my $progress_report_time = time;
- my $clean_pages_total_duration = 0;
- my $last_loop = $table_info->{stats}{page_count} + 1;
- my $expected_error_occurred = 0;
- my $expected_page_count = $table_info->{stats}{page_count};
- my $column_name = get_update_column($schema_name, $table_name);
- if ($DBI::err) {
- logger(LOG_ERROR, "Table handling interrupt.");
- return -1;
- }
- my $pages_per_round = get_pages_per_round($table_info->{stats}{page_count}, $to_page);
- my $pages_before_vacuum = get_pages_before_vacuum($table_info->{stats}{page_count}, $expected_page_count);
- my $max_tupples_per_page = get_max_tupples_per_page($schema_name, $table_name);
- if ($DBI::err) {
- logger(LOG_ERROR, "Table handling interrupt.");
- return -1;
- }
- logger(LOG_NOTICE, "Update by column: %s.", $column_name||'');
- logger(LOG_NOTICE, "Set pages/round: %d.", $pages_per_round);
- logger(LOG_NOTICE, "Set pages/vacuum: %d.", $pages_before_vacuum);
- my $start_time;
- my $last_to_page;
- my $loop;
- for ($loop = $table_info->{stats}{page_count}; $loop > 0; $loop--) {
- $start_time = time;
- _dbh->begin_work;
- $last_to_page = $to_page;
- $to_page = clean_pages($schema_name, $table_name, $column_name, $last_to_page, $pages_per_round, $max_tupples_per_page);
- $clean_pages_total_duration += (time - $start_time);
- unless(defined($to_page)) {
- _dbh->rollback();
- if ($DBI::err && $DBI::errstr =~ 'deadlock detected') {
- logger(LOG_ERROR,"Detected deadlock during cleaning.");
- next;
- } elsif ($DBI::err && $DBI::errstr =~ 'cannot extract system attribute') {
- logger(LOG_ERROR, "System attribute extraction error has occurred, processing stopped.");
- $expected_error_occurred = 1;
- last;
- } elsif($DBI::err) {
- logger(LOG_ERROR,"Cannot handle table: %s", $DBI::errstr);
- return -1;
- } else {
- logger(LOG_ERROR, "Incorrect result of cleaning: column_name %s, to_page %d, pages_per_round %d, max_tupples_per_page %d.",$column_name, $last_to_page, $pages_per_round, $max_tupples_per_page);
- }
- } else {
- if ($to_page == -1) {
- _dbh->rollback();
- $to_page = $last_to_page;
- last;
- }
- _dbh->commit();
- }
- sleep($delay_ratio * (time - $start_time));
- if (_after_round_statement) {
- _after_round_statement->execute();
- if ($DBI::err) {
- logger(LOG_ERROR, "SQL Error in after round statement: %s", $DBI::errstr);
- }
- }
- if (time - $progress_report_time >= PROGRESS_REPORT_PERIOD && $last_to_page != $to_page) {
- logger(LOG_WARNING, "Progress: %s %d pages completed.", (defined $bloat_stats->{effective_page_count} ? int(100 * ($to_page ? ($initial_size_stats->{page_count} - $to_page - 1) / ($table_info->{base_stats}{page_count} - $bloat_stats->{effective_page_count}) : 1) ).'%, ' : ' '), ($table_info->{stats}{page_count} - $to_page - 1));
- $progress_report_time = time;
- }
- $expected_page_count -= $pages_per_round;
- $vacuum_page_count += ($last_to_page - $to_page);
- if ($routine_vacuum && $vacuum_page_count >= $pages_before_vacuum) {
- my $duration = $clean_pages_total_duration / ($last_loop - $loop);
- my $average_duration = $duration == 0 ? 0.0001 : $duration;
- logger(LOG_WARNING, "Cleaning in average: %.1f pages/second (%.3f seconds per %d pages).", ($pages_per_round / $average_duration), $duration, $pages_per_round);
- $clean_pages_total_duration = 0;
- $last_loop = $loop;
- my $vacuum_time = time;
- vacuum($schema_name, $table_name);
- if ($DBI::err) {
- logger(LOG_ERROR, "Table handling interrupt.");
- return -1;
- }
- $vacuum_time = time - $vacuum_time;
- $table_info->{stats} = get_size_stats($schema_name, $table_name);
- if ($DBI::err) {
- logger(LOG_ERROR, "Table handling interrupt.");
- return -1;
- }
- if ($table_info->{stats}{page_count} > $to_page + 1) {
- logger(LOG_NOTICE, "Vacuum routine: can not clean %d pages, %d pages left, duration %0.3f seconds.", ($table_info->{stats}{page_count} - $to_page - 1), $table_info->{stats}{page_count}, $vacuum_time);
- } else {
- logger(LOG_NOTICE, "Vacuum routine: %d pages left, duration %.3f seconds.", ($table_info->{stats}{page_count}||0), $vacuum_time);
- }
- $vacuum_page_count = 0;
- my $last_pages_before_vacuum = $pages_before_vacuum;
- $pages_before_vacuum = get_pages_before_vacuum($table_info->{stats}{page_count}, $expected_page_count);
- if ($last_pages_before_vacuum != $pages_before_vacuum) {
- logger(LOG_WARNING, "Set pages/vacuum: %d.", $pages_before_vacuum);
- }
- }
- if ($to_page >= $table_info->{stats}{page_count}) {
- $to_page = $table_info->{stats}{page_count} - 1;
- }
- if ($to_page <= 1) {
- $to_page = 0;
- last;
- }
- my $last_pages_per_round = $pages_per_round;
- $pages_per_round = get_pages_per_round($table_info->{stats}{page_count}, $to_page);
- if ($last_pages_per_round != $pages_per_round) {
- logger(LOG_WARNING, "Set pages/round: %d.", $pages_per_round);
- }
- }
- if ($loop == 0) {
- logger(LOG_WARNING, "Maximum loops reached.");
- }
- if ($to_page > 0) {
- my $vacuum_time = time;
- sleep(1);
- vacuum($schema_name, $table_name);
- if ($DBI::err) {
- logger(LOG_ERROR, "Table handling interrupt.");
- return -1;
- }
- $vacuum_time = time - $vacuum_time;
- $table_info->{stats} = get_size_stats($schema_name, $table_name);
- if ($DBI::err) {
- logger(LOG_ERROR, "Table handling interrupt.");
- return -1;
- }
- if ($table_info->{stats}{page_count} > $to_page + $pages_per_round) {
- logger(LOG_NOTICE, "Vacuum final: cannot clean %d pages, %d pages left, duration %0.3f seconds.", ($table_info->{stats}{page_count} - $to_page - $pages_per_round), $table_info->{stats}{page_count}, $vacuum_time);
- } else {
- logger(LOG_NOTICE, "Vacuum final: %d pages left, duration %.3f seconds.", ($table_info->{stats}{page_count}||0), $vacuum_time);
- }
- }
- #if (not $self->{'_no_final_analyze'}) {
- my $analyze_time = time;
- analyze($schema_name, $table_name);
- if ($DBI::err) {
- logger(LOG_ERROR, "Table handling interrupt.");
- return -1;
- }
- $analyze_time = time - $analyze_time;
- logger(LOG_NOTICE, "Analyze final: duration %.3f second.", $analyze_time);
- #}
- my $get_stat_time = time;
- $bloat_stats = get_bloat_stats($schema_name, $table_name);
- if ($DBI::err) {
- logger(LOG_ERROR, "Table handling interrupt.");
- return -1;
- }
- $get_stat_time = time - $get_stat_time;
- logger(LOG_NOTICE,"Bloat statistics with pgstattuple: duration %.3f seconds.", $get_stat_time);
- $pages_before_vacuum = get_pages_before_vacuum($table_info->{stats}{page_count}, $expected_page_count);
- }
- my $will_be_skipped = (!$is_locked && ($is_skipped || $table_info->{stats}{page_count} < MINIMAL_COMPACT_PAGES || $bloat_stats->{free_percent} < MINIMAL_COMPACT_PERCENT));
- if (!$is_locked && ($print_reindex_queries || (!$no_reindex && (!$is_skipped || ($attempt == $max_retry_count) || (!$is_reindexed && $is_skipped && $attempt == 0))))) {
- $is_reindexed = (reindex_table($table_name, $schema_name, $db_name, $print_reindex_queries) || $is_reindexed);
- if (!$no_reindex) {
- $table_info->{stats} = get_size_stats($schema_name, $table_name);
- if ($DBI::err) {
- logger(LOG_ERROR, "Table handling interrupt.");
- return -1;
- }
- }
- }
- if (!$is_locked && !($is_skipped && !defined($is_reindexed))) {
- my $complete = (($will_be_skipped || $is_skipped) && (defined $is_reindexed ? $is_reindexed : 1));
- if ($complete) {
- logger(LOG_NOTICE, "Processing complete.");
- } else {
- logger(LOG_NOTICE, "Processing incomplete.");
- }
- if (defined $bloat_stats->{free_percent} && defined $bloat_stats->{effective_page_count} && $bloat_stats->{free_percent} > 0 && $table_info->{stats}{page_count} > $bloat_stats->{effective_page_count} && !$complete) {
- logger(LOG_WARNING, "Processing results: %d pages left (%d pages including toasts and indexes), size reduced by %s (%s including toasts and indexes) in total , approximately %d%% (%d pages) that is %s more were expected to be compacted after this attempt.",
- $table_info->{stats}{page_count},
- $table_info->{stats}{total_page_count},
- nice_size($table_info->{base_stats}{size} - $table_info->{stats}{size}),
- nice_size($table_info->{base_stats}{total_size} - $table_info->{stats}{total_size}),
- $bloat_stats->{free_percent},
- $table_info->{stats}{page_count} - $bloat_stats->{effective_page_count},
- nice_size($bloat_stats->{free_space})
- );
- } else {
- logger(LOG_WARNING, "Processing results: %d pages left (%d pages including toasts and indexes), size reduced by %s (%s including toasts and indexes) in total.",
- $table_info->{stats}{page_count},
- $table_info->{stats}{total_page_count},
- nice_size($table_info->{base_stats}{size} - $table_info->{stats}{size}),
- nice_size($table_info->{base_stats}{total_size} - $table_info->{stats}{total_size}),
- );
- }
- }
- return (($is_locked || $is_skipped || $will_be_skipped) && (defined $is_reindexed ? $is_reindexed : 1));
- }
- #Main code
- if ($show_version) {
- show_version;
- exit(1);
- }
- if ($show_help) {
- show_help;
- exit(1);
- }
- if ($show_man) {
- show_man;
- exit(1);
- }
- unless ($db_name || $table_name || $all_db) {
- not_enough_arguments();
- exit(1);
- }
- if ($no_reindex && $initial_reindex) {
- no_together_arguments('no_reindex', 'initial_reindex');
- exit(1);
- }
- my @dbs = ($db_name);
- if($all_db) {
- $db_name = 'template1';
- unless(db_connect($db_name, $db_host, $db_port, $db_user, $db_passwd)) {
- logger(LOG_ERROR, "Cannot get database list. Quit.");
- exit(0);
- }
- @dbs =
- my $dbs = get_databases;
- unless ($dbs) {
- logger(LOG_ERROR, "Interrupt processing.");
- exit(0);
- }
- @dbs = @$dbs;
- db_disconnect;
- }
- if($only_schema) {
- %only_schemas = map {$_ => 1} split(/,/,$only_schema);
- $schema_name = $only_schema if (scalar(keys(%only_schemas)) == 1);
- }
- if($exclude_schema) {
- %excluded_schemas = map {$_ => 1} split(/,/,$exclude_schema);
- }
- if($exclude_table) {
- %excluded_tables = map {$_ => 1} split(/,/,$exclude_table);
- }
- my $databases_left;
- my $is_something_processed;
- foreach my $current_db_name (@dbs) {
- $table_info{$current_db_name} = {tables => {}, attempts => 0, size => 0, base_size => 0};
- $db_name = $current_db_name;
- my $tables_left;
- set_current_db_name($current_db_name);
- unset_current_schema_name_table_name;
- unless(db_connect($current_db_name, $db_host, $db_port, $db_user, $db_passwd)) {
- $databases_left++;
- next;
- }
- my $ionice_made = undef;
- my $backend_pid = get_pg_backend_pid();
- if ($backend_pid) {
- my $user_login = $ENV{LOGNAME} || $ENV{USER} || getpwuid($<);
- if ($user_login eq 'postgres' || $user_login eq 'root') {
- my $errstr = `ionice -c 3 -p $backend_pid 2>/dev/stdout`;
- if ($errstr) {
- chomp $errstr;
- logger(LOG_WARNING, "Cannot set ionice 3 for the process. It is recomennded to set ionice -c 3 for pgcompacttable. Error: %s", $errstr);
- } else {
- $ionice_made = 1;
- }
- }
- logger(LOG_WARNING, "Postgress backend pid: $backend_pid") if ($backend_pid && $backend_pid =~/^\d+$/ && $backend_pid > 0);
- } else {
- logger(LOG_ERROR, "Cannot get backend pid from Postgres. Exitting...");
- exit(-1);
- }
- unless ($ionice_made) {
- logger(LOG_WARNING, "It is recomennded to set ionice -c 3 for pgcompacttable: ionice -c 3 -p %d", $backend_pid);
- }
- set_session_replication_role;
- if ($DBI::err) {
- logger(LOG_ERROR, "Database handling interrupt.");
- db_disconnect($current_db_name);
- $databases_left++;
- next;
- }
- unless(get_pgstattuple_schema_name) {
- logger('qiuet', "Skip handling database %s: pgstattuple extention is not found", $current_db_name);
- db_disconnect($current_db_name);
- next;
- }
- unless (create_clean_pages_function) {
- logger('qiuet', "Skip handling database %s: pgstattuple cannot create clean_pages function", $current_db_name);
- db_disconnect($current_db_name);
- $databases_left++;
- next;
- }
- my $database_tables = ($schema_name && $table_name) ? [{schemaname => $schema_name, tablename => $table_name}] : get_database_tables($current_db_name);
- unless($database_tables && ref $database_tables eq 'ARRAY' && scalar(@$database_tables) > 0) {
- logger('qiuet', "Skip handling database %s: cannot find tables", $current_db_name);
- }
- for (my $attempt = 0; $attempt < $max_retry_count; $attempt++) {
- logger('qiuet', "Handling tables. Attempt %s", ($attempt + 1));
- $table_info{$current_db_name}{attempts}++;
- my @retry_idents = ();
- foreach my $current_ident (@$database_tables) {
- next if (!$current_ident || ref $current_ident ne 'HASH' || $excluded_schemas{$current_ident->{schemaname}} || ($only_schema && !$only_schemas{$current_ident->{schemaname}}) || $excluded_tables{"$current_ident->{schemaname}.$current_ident->{tablename}"});
- my $table_key = $current_ident->{schemaname}.$current_ident->{tablename};
- $table_info{$current_db_name}{tables}{$table_key}{current} = {} unless ($table_info{$current_db_name}{tables}{$table_key}{current} && ref $table_info{$current_db_name}{tables}{$table_key}{current} eq 'HASH');
- $is_something_processed = 1 unless ($is_something_processed);
- set_current_schema_name_table_name($current_ident->{schemaname}, $current_ident->{tablename});
- logger(LOG_NOTICE, "Start handling table %s.%s", $current_ident->{schemaname}, $current_ident->{tablename});
- push @retry_idents, $current_ident unless process($current_ident->{schemaname}, $current_ident->{tablename}, $attempt, $table_info{$current_db_name}{tables}{$table_key}{current});
- logger(LOG_NOTICE, "Finish handling table %s.%s", $current_ident->{schemaname}, $current_ident->{tablename});
- if ($attempt == 0) {
- $table_info{$current_db_name}{tables}{$table_key}{final}{base_stats}{size} = $table_info{$current_db_name}{tables}{$table_key}{current}{base_stats}{size};
- $table_info{$current_db_name}{tables}{$table_key}{final}{base_stats}{total_size} = $table_info{$current_db_name}{tables}{$table_key}{current}{base_stats}{total_size};
- }
- $table_info{$current_db_name}{tables}{$table_key}{final}{stats}{size} = $table_info{$current_db_name}{tables}{$table_key}{current}{stats}{size};
- $table_info{$current_db_name}{tables}{$table_key}{final}{stats}{total_size} = $table_info{$current_db_name}{tables}{$table_key}{current}{stats}{total_size};
- }
- if (scalar @retry_idents > 0) {
- @$database_tables = @retry_idents;
- } else {
- undef $tables_left;
- last;
- }
- $tables_left = scalar(@retry_idents);
- }
- drop_clean_pages_function;
- unset_current_schema_name_table_name;
- $databases_left++ if ($tables_left);
- logger(LOG_WARNING, "Processing %scomplete%s.", ($tables_left ? 'in' : ''), ($tables_left ? ": $tables_left tables left" : ''));
- $table_info{$current_db_name}{size} = sum(map {$table_info{$current_db_name}{tables}{$_}{final}{base_stats}{size} - $table_info{$current_db_name}{tables}{$_}{final}{stats}{size}} keys(%{$table_info{$current_db_name}{tables}}));
- $table_info{$current_db_name}{total_size} = sum(map {$table_info{$current_db_name}{tables}{$_}{final}{base_stats}{total_size} - $table_info{$current_db_name}{tables}{$_}{final}{stats}{total_size}} keys(%{$table_info{$current_db_name}{tables}}));
- logger(LOG_ERROR, "Processing results: size reduced by %s (%s including toasts and indexes) in total.", nice_size($table_info{$current_db_name}{size}), nice_size($table_info{$current_db_name}{total_size}));
- db_disconnect($current_db_name);
- }
- unset_current_db_name;
- if($databases_left) {
- logger(LOG_WARNING, "Processing incomplete: %d databases left.", $databases_left);
- } else {
- logger(LOG_WARNING, "Processing complete: %d retries to process has been done", sum(map {$table_info{$_}{attempts}} keys(%table_info)));
- }
- if ($is_something_processed) {
- my $dbases_message = "";
- foreach (keys(%table_info)) {
- $dbases_message .= ", " . nice_size($table_info{$_}{size}) . " (" . nice_size($table_info{$_}{total_size}) . ") $_" if ($table_info{$_} && ref $table_info{$_} eq 'HASH' && $table_info{$_}{size} && $table_info{$_}{total_size});
- }
- logger(LOG_ERROR, "Processing results: size reduced by %s (%s including toasts and indexes) in total%s.",
- nice_size(sum(map {$table_info{$_}{size}||0} keys(%table_info)) || 0),
- nice_size(sum(map {$table_info{$_}{total_size}||0} keys(%table_info))|| 0),
- $dbases_message
- );
- }
- 1;
- =head1 NAME
- B<pgcompacttable> - PostgreSQL bloat reducing tool.
- =head1 SYNOPSIS
- pgcompacttable [OPTION...]
- =over 4
- =item General options:
- [-?mV] [(-q | -v LEVEL)]
- =item Connection options:
- [-h HOST] [-p PORT] [-U USER] [-W PASSWD] [-P PATH]
- =item Targeting options:
- (-a | -d DBNAME...) [-n SCHEMA...] [-t TABLE...] [-D DBNAME...]
- [-N SCHEMA...] [-T TABLE...]
- =back
- =head1 DESCRIPTION
- B<pgcompacttable> is a tool to reduce bloat for tables and indexes without
- heavy locks.
- =over 4
- =back
- pgstattuple must be installed. B<pgcompacttable> uses pgstattuple to get statistics.
- =head1 EXAMPLES
- Shows usage manual.
- pgcompacttable --man
- Compacts all the bloated tables in all the database in the cluster plus their bloated indexes. Prints additional progress information.
- pgcompacttable --all --verbose info
- Compacts all the bloated tables in the billing database and their bloated indexes excepts ones that are in the pgq schema.
- pgcompacttable --dbname billing --exclude-schema pgq
- =head1 OPTIONS
- =head2 General options
- =over 4
- =item B<-?>
- =item B<--help>
- Display short help.
- =item B<-m>
- =item B<--man>
- Display full manual.
- =item B<-V>
- =item B<--version>
- Print version.
- =item B<-q>
- =item B<--quiet>
- Quiet mode. Do not display any messages exept error messages and total result.
- =item B<-v>
- =item B<--verbose>
- Verbose mode. Display all the progress messages.
- =item B<-q>
- =item B<--quiet>
- A verbosity level. One of C<warning>, C<notice>, C<info>. By default C<notice>.
- =back
- =head2 Connection options
- The B<pgcompacttable> tries to connect to the database with the DBI Perl module.
- If some of the connection options is not specified the tool tries to
- get it from C<PGHOST>, C<PGPORT>, C<PGUSER>, C<PGPASSWORD> environment
- variables. If password is still unknown after that than it tries to
- get it from the password file that C<PGPASSFILE> refers to and if this
- file does not exist it tries to get it from C<HOME/.pgpass> file.
- =over 4
- =item B<-h> HOST
- =item B<--host> HOST
- A database host. By default C<localhost>.
- =item B<-p> PORT
- =item B<--port> PORT
- A database port. By default C<5432>.
- =item B<-U> USER
- =item B<--user> USER
- A database user. By default current system user is used (as returned by whoami).
- =item B<-W> PASSWD
- =item B<--password> PASSWD
- A password for the user.
- =back
- =head2 Targeting options
- Note that if you specified a database, schema or table that is not in the cluster it will be ignored. Redundant exclusions will be ignored too. All these options except C<--all> can be specified several times.
- =over 4
- =item B<-a>
- =item B<--all>
- Process all the databases in the cluster.
- =item B<-d> DBNAME
- =item B<--dbname> DBNAME
- A database to process. By default all the user databses of the instance are processed.
- =item B<-D> DBNAME
- =item B<--exclude-dbname> DBNAME
- A database to exclude from processing.
- =item B<-n> SCHEMA
- =item B<--schema> SCHEMA
- A schema to process. By default 'public' schema id processed.
- =item B<-N> SCHEMA
- =item B<--exclude-schema> SCHEMA
- A schema to exclude from processing.
- =item B<-t> TABLE
- =item B<--table> TABLE
- A table to process. By default all the tables of the specified schema are processed.
- =item B<-T> TABLE
- =item B<--exclude-table> TABLE
- A table to exclude from processing.
- =back
- =head2 Options controlling the behaviour
- =over 4
- =item B<-R>
- =item B<--routine-vacuum>
- Turn on the routine vacuum. By default all the vacuums are off.
- =item B<-r>
- =item B<--no-reindex>
- Turn off reindexing of tables after processing.
- =item B<-i>
- =item B<--initial-reindex>
- Perform an initial reindex of tables before processing.
- =item B<-s>
- =item B<--print-reindex-queries>
- Print reindex queries. Useful if you want to perform manual
- reindex later.
- =item B<-f>
- =item B<--force>
- Try to compact even those tables and indexes that do not meet minimal bloat requirements.
- =item B<-E> RATIO
- =item B<--delay-ratio> RATIO
- A dynamic part of the delay between rounds is calculated as previous-round-time * delay-ratio. By default 2.
- =item B<-Q> Query
- =item B<--after-round-query> Query
- SQL statement to be called after each round against current database
- =item B<-o> COUNT
- =item B<--max-retry-count> COUNT
- A maximum number of retries in case of unsuccessful processing. By default 10.
- =item B<-x> COUNT
- =back
- =cut
- =head1 LICENSE AND COPYRIGHT
- Copyright (c) 2015 Maxim Boguk
- =head1 AUTHOR
- =over 4
- =item L<Maxim Boguk|mailto:maxim.boguk@gmail.com>
- =back
- =cut
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement