#!/usr/bin/env perl
use strict;
use warnings FATAL => 'all';
use Gearman::Worker;
use JSON;
use LWP::UserAgent;
use Data::Dumper;
use Getopt::Long;
use Digest::
MD5 qw/md5_hex/;
# Service manager and gearman server information
# TODO: Using an options file would be better
my $agent_username = 'agent';
my $agent_password = 'mysql';
my $srv_mgr_host = '127.0.0.1';
my $srv_mgr_port = '48080';
my $gearmand_host = '127.0.0.1:4730';
#TODO Find a better way to send the drizzle hostname
my $hostname = 'localhost';
# REST API base urls
my $base_statement_uri = 'http://' .$agent_username . ':'. $agent_password .
'@'.$srv_mgr_host.':'.$srv_mgr_port.'/v2/rest/instance/mysql/statementsummary/';
my $base_agent_uri = 'http://' .$agent_username . ':'. $agent_password .
'@'.$srv_mgr_host.':'.$srv_mgr_port. '/v2/rest/instance/mysql/agent/';
my $base_host_uri = 'http://' .$agent_username . ':'. $agent_password .
'@'.$srv_mgr_host.':'.$srv_mgr_port. '/v2/rest/instance/os/Host/';
my $base_db_server_uri = 'http://' .$agent_username . ':'. $agent_password .
'@'.$srv_mgr_host.':'.$srv_mgr_port. '/v2/rest/instance/mysql/server/';
my $base_db_server_variables_uri = 'http://' .$agent_username . ':'. $agent_password .
'@'.$srv_mgr_host.':'.$srv_mgr_port. '/v2/rest/instance/mysql/variables/';
my $base_stmt_example_uri = 'http://' .$agent_username . ':'. $agent_password .
'@'.$srv_mgr_host.':'.$srv_mgr_port. '/v2/rest/instance/mysql/statement/';
my $DEBUG = $ENV{DEBUG}; #TODO: Implement DEBUG
my $parent;
#my $agent_uuid;
my $server_display_name;
my $server_uuid;
my $counter = 1;
my $uri_type;
my $server_host_uuid;
my $g_dbname;
my $md5_hash;
my $querybase = {};
GetOptions(
#'agentuuid=s' => \$agent_uuid,
'serverdisplayname=s' => \$server_display_name,
'serveruuid=s' => \$server_uuid,
'serverhostuuid=s' => \$server_host_uuid
);
#Sanity check
#usage(1) if ( !defined ( $agent_uuid ) );
usage
(1) if ( !
defined ( $server_display_name ) );
usage
(2) if ( !
defined ( $server_uuid ) );
usage
(3) if ( !
defined ( $server_host_uuid ) );
#Disabled for now, the service manager needs some work
#init_transport();
init_host();
init_db_server();
init_db_server_variables();
init_stmt_summary();
init_stmt_example();
init_gearman_worker();
#########################################################################
# Usage help
#########################################################################
sub usage {
#print "\n--agentuuid=... parameter was missing" if $error_code == 1;
print "\n--serveruuid=... parameter was missing" if $error_code ==
2;
print "\n--serverhostuuid=... parameter was missing" if $error_code ==
3;
print "\nDEBUG=[1-5] perl worker.pl \\ ";
#print "\n --agentuuid=\"11111111-1111-1111-1111-111111111111\" \\ ";
print "\n --serverdisplayname=\"server name\" \\ ";
print "\n --serveruuid=\"22222222-2222-2222-2222-222222222222\" \\ ";
print "\n --serverhostuuid=\"ssh:{11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11}\" \n\n";
}
#########################################################################
#########################################################################
#########################################################################
#We register the drizzlelog function that the logging gearman plugin sends
sub init_gearman_worker {
my $worker = Gearman::Worker->new;
$worker->job_servers($gearmand_host);
$worker->register_function(drizzlelog => sub { assemble_queries($_[0]->arg) });
$worker->work while 1;
}
# At different times we use different uri for the REST API
sub uri {
my $u;
if ( $uri_type eq "stmt_summary" ) {
$u = $base_statement_uri . $server_uuid . "." . $g_dbname . "." . $md5_hash;
} elsif ( $uri_type eq "init_stmt_summary" ) {
$u = $u = $base_statement_uri . $server_uuid;
} elsif ( $uri_type eq "init_transport" ) {
#$u = $base_agent_uri . $agent_uuid;
} elsif ( $uri_type eq "init_host" ) {
$u = $base_host_uri . $server_host_uuid;
} elsif ( $uri_type eq "init_db_server" ) {
$u = $base_db_server_uri . $server_uuid;
} elsif ( $uri_type eq "init_db_server_variables" ) {
$u = $base_db_server_variables_uri . $server_uuid;
} elsif ( $uri_type eq "init_stmt_example" ) {
$u = $base_stmt_example_uri . $server_uuid;
} elsif ( $uri_type eq "stmt_example" ) {
$u = $base_stmt_example_uri . $server_uuid . "." . $g_dbname . "." . $md5_hash;
}
}
# We add a "parent" attribute or not based on what kind of
# data we are sending.
sub make_transform {
my ( %rep ) = @_;
if ( $uri_type eq "stmt_summary" ) {
$rep{parent} = '/instance/mysql/server/' . $server_uuid;
} elsif ( $uri_type eq "init_stmt_summary" ) {
$rep{parent} = '/instance/mysql/server/' . $server_uuid;
} elsif ( $uri_type eq "init_db_server" ) {
$rep{parent} = '/instance/os/Host/' . $server_host_uuid;
} elsif ( $uri_type eq "init_db_server_variables" ) {
$rep{parent} = '/instance/mysql/server/' . $server_uuid;
} elsif ( $uri_type eq "init_stmt_example" ) {
$rep{parent} = '/instance/mysql/statementsummary/' . $server_uuid;
} elsif ( $uri_type eq "stmt_example" ) {
$rep{parent} = '/instance/mysql/statementsummary/' . $server_uuid . "." . $g_dbname . "." . $md5_hash;
} else {
#No Parent
}
# build up the perl hash representation of an inventory transform
my $json = to_json( \%rep, { pretty => 1, utf8 => 1 } );
print Dumper
($json) if $DEBUG >
2;
}
#Did our PUT worked?
sub get_resource {
my $ua = LWP::UserAgent->new;
$ua->agent("MemPut/0.1");
my $req = HTTP::Request->new( GET => $uri );
$req->header( Accept => "application/json" );
# Pass request to the user agent and get a response back
my $res = $ua->request($req);
#print Dumper($res);
# Check the outcome of the response
if ( $res->is_success ) {
print $res->
content if $DEBUG >
1;
print "\n" if $DEBUG >
1;
}
else {
print $res->
status_line,
"\n";
}
}
#Send PUT request to the REST API
sub put_transform {
( my $uri, my $json ) = @_;
my $ua = LWP::UserAgent->new;
$ua->agent("MemPut/0.1");
print "\n\n" .
$uri .
"\n\n\n" if $DEBUG >
4;
my $req = HTTP::Request->new( PUT => $uri );
$req->content_type('application/json');
#if ( defined( $agent_uuid ) ) {
#$req->header( "X-MySQL-Monitor-Agent-UUID" => $agent_uuid );
#}
$req->content($json);
# Pass request to the user agent and get a response back
my $res = $ua->request($req);
#print Dumper($res);
# Check the outcome of the response
if ( $res->is_success ) {
print $res->
content if $DEBUG >
1;
}
else {
print $res->
status_line,
"\n";
}
}
#Send query analyzer related data
sub send_json_data {
my %querybase = @_;
# Turn command line into uri and values hash
my $uri = uri($g_dbname, $md5_hash);
my $json = make_transform( %querybase );
put_transform( $uri, $json );
get_resource($uri);
}
#Simple way to convert literal query into a
#canonical representation
#TODO: Expand this
sub canonicalize {
$query =~
s/\ \d\ /\ ? \ /g;
$query =~
s/limit \d+/limit ?/g;
my $canonical_query = $query;
}
#Split the log entry the logging plugin sends.
#Group queries by query text.
#Every $interval number of queries, send them up
sub assemble_queries {
my $interval = 4;
my ($getmicrotime, $session_id, $query_id, $dbname, $query_text,
$query_type, $time_session_conn, $t_exec_time, $exec_time_a_locks,
$rows_returned,
$rows_exam,
$num_tmp_tlbs,
$count_warnings ) =
split(/
(?<!\\
)\,/,
$line );
print "\nLine => " .
$line .
"" if $DEBUG >
4;
my $query = canonicalize($query_text);
print "\nQuery =>" .
$query .
"\n" if $DEBUG >
4;
$querybase->
{$query}->
{database
} =
substr( $dbname,
1,
-1);
$querybase->
{$query}->
{text
} =
substr( $query,
1,
-1);
$querybase->{$query}->{count}++;
$querybase->{$query}->{rows} += $rows_returned;
$querybase->{$query}->{exec_time} += $t_exec_time;
print "Clean query => " .
$querybase->
{$query}->
{text
} .
"\n" if $DEBUG >
4;
if ( defined( $querybase->
{$query}->
{max_rows
} ) ) { $querybase->{$query}->{max_rows} = $rows_returned if ($rows_returned > $querybase->{$query}->{max_rows});
$querybase->{$query}->{min_rows} = $rows_returned if ($rows_returned < $querybase->{$query}->{min_rows});
$querybase->{$query}->{max_exec_time} = $t_exec_time if ($t_exec_time > $querybase->{$query}->{max_exec_time});
$querybase->{$query}->{min_exec_time} = $t_exec_time if ($t_exec_time < $querybase->{$query}->{min_exec_time});
} else {
$querybase->{$query}->{max_rows} = $rows_returned;
$querybase->{$query}->{min_rows} = $rows_returned;
$querybase->{$query}->{max_exec_time} = $t_exec_time;
$querybase->{$query}->{min_exec_time} = $t_exec_time;
}
#print "Counter: " . $counter . "\n";
#print "size of hash: " . keys( %{$querybase->{$query}} ) . ".\n";
#print $query . "\n\n";
if ( ( $counter % $interval ) == 0 ) {
#print STDERR "Writing quan packets ($counter queries sent)\n";
foreach my $query ( keys %
{$querybase} ) { #print "2: " . $query . "\n\n";
my %rep1 =
( values =>
$querybase->
{$query} );
$dbname = $querybase->{$query}->{database};
#print $querybase->{$query}->{text} . "\n";
$md5_hash = md5_hex( $querybase->{$query}->{text} );
$g_dbname = $dbname;
print "Sending: " .
$querybase->
{$query}->
{text
} .
"\n"if $DEBUG >
0;
$uri_type = "stmt_summary";
send_json_data( %rep1 );
##
## Query example
##
delete $querybase->
{$query}->
{max_rows
};
delete $querybase->
{$query}->
{min_rows
};
delete $querybase->
{$query}->
{max_exec_time
};
delete $querybase->
{$query}->
{min_exec_time
};
delete $querybase->
{$query}->
{count
};
$querybase->{$query}->{text} = $query_text;
$querybase->{$query}->{connection_id} = $session_id;
%rep1 =
( values =>
$querybase->
{$query} );
$uri_type = "stmt_example";
$g_dbname = $dbname;
send_json_data( %rep1 );
#print "Deleting: " . $querybase->{$query}->{text} . "\n";
delete( $querybase->
{$query} );
}
}
$counter++;
}
#TODO: This is actually missing on the Service Manager
sub init_transport {
$uri_type = "init_transport";
my $uri = uri();
my %rep = ( attributes =>
{"name"=> "string",
"version" => "string",
"host_id" => "string"
});
my $json = make_transform( %rep );
#print Dumper($json);
put_transform( $uri, $json);
get_resource($uri);
## Send initial values
{"name"=> "perl transporter",
"version" => "0.3",
"host_id" => $server_host_uuid
});
$json = make_transform( %rep );
#print Dumper($json);
put_transform( $uri, $json );
get_resource($uri);
}
#Send basic info related to the drizzle host
sub init_host {
$uri_type = "init_host";
my $uri = uri();
my %rep = ( attributes =>
{"name"=> "string"});
my $json = make_transform( %rep );
#print Dumper($json);
put_transform( $uri, $json);
get_resource($uri);
## Send initial values
{"name"=> $hostname});
$json = make_transform( %rep );
#print Dumper($json);
put_transform( $uri, $json);
get_resource($uri);
}
#basic info describing the drizzle daemon
sub init_db_server {
$uri_type = "init_db_server";
my $uri = uri();
my %rep = ( attributes =>
{"displayname" => "string",
"server.connected" => "long",
"server.reachable" => "long"
});
my $json = make_transform( %rep );
#print Dumper($json);
put_transform( $uri, $json);
get_resource($uri);
## Send initial values
{"displayname" => $server_display_name,
"server.connected" => "1",
"server.reachable" => "1"
});
$json = make_transform( %rep );
#print Dumper($json);
put_transform( $uri, $json );
get_resource($uri);
}
#This should be all values from show global variables
sub init_db_server_variables{
$uri_type = "init_db_server_variables";
my $uri = uri();
my %rep = ( attributes =>
{"port" => "long",
"version" => "string"
});
my $json = make_transform( %rep );
#print Dumper($json);
put_transform( $uri, $json );
get_resource($uri);
## Send initial values
%rep = (
name => $server_uuid,
{"port" => "9939",
"version" => "7.0.2"
});
$json = make_transform( %rep );
#print Dumper($json);
put_transform( $uri, $json );
get_resource($uri);
}
#The query analyzer summary table
sub init_stmt_summary{
$uri_type = "init_stmt_summary";
my $uri = uri();
my %rep = ( attributes =>
{"avg_exec_time" => "long",
"count" => "long",
"database" => "string",
"exec_time" => "long",
"max_exec_time" => "long",
"max_rows" => "long",
"min_exec_time" => "long",
"min_rows" => "long",
"query_type" => "string",
"rows" => "long",
"text" => "string",
"text_hash" => "string",
"warnings" => "string"
});
my $json = make_transform( %rep );
#print Dumper($json);
put_transform( $uri, $json );
get_resource($uri);
}
#This shows up on the Query popup -> example tab
sub init_stmt_example{
$uri_type = "init_stmt_example";
my $uri = uri();
my %rep = ( attributes =>
{"comment" => "string",
"connection_id" => "long",
"database" => "string",
"errors" => "long",
"exec_time" => "long",
"explain_plan" => "string",
"query_type" => "string",
"rows" => "long",
"text" => "string",
"warnings" => "string"
});
my $json = make_transform( %rep );
#print Dumper($json);
put_transform( $uri, $json );
get_resource($uri);
}
1;
__END__
I do hope this article has been useful in getting to know how the Drizzle replication system passes the Command message around within a single server. You got to see the replicator and applier plugin APIs and example implementations of those APIs in the form of the FilteredReplicator and CommandLog classes. Next, we'll be looking in detail at the Command Log itself, its format, and writing/reading from it.
quick degree | honorary degrees