Can't run two AnyEvent::timers in the same time?
I'm trying to build a Jobs Queue Manager that is Inotyfing a folder for
changes, when a job is inserted to the new folder it is being moved to
progress folder and if the job hasn't been successfully processed, the
program moves it to failed folder. Now, I want to add a feature that after
10 minutes that a job is in failed folder, the program will retry to do
the job again by moving the job to the new folder.
Here's my code:
use strict;
use warnings;
package QueueManager;
use AnyEvent;
use AnyEvent::Filesys::Notify;
use Const::Fast;
use DDP;
use File::Basename;
use File::Copy;
use File::Remove qw(remove);
use File::Slurp;
use File::Stat qw(:stat);
use List::Util qw(first);
use List::MoreUtils qw(natatime);
use Moo;
use MooX::Types::MooseLike::Base qw<Str Num InstanceOf ArrayRef HashRef Int>;
use Regexp::Common qw(net);
use v5.10.1;
with 'MooseX::Role::Loggable';
has notifier_interval => (
is => 'ro',
isa => Num,
default => sub {30.0},
);
has failed_interval => (
is => 'ro',
isa => Num,
default => sub{60*10},
);
has number_of_jobs_to_process => (
is => 'ro',
isa => Int,
default => sub{20},
);
has failed_timer => (
is => 'ro',
writer => 'set_failed_timer',
);
has notifier => (
is => 'ro',
isa => InstanceOf['AnyEvent::Filesys::Notify'],
writer => 'set_notifier',
);
has add_output_file => (
is => 'ro',
isa => Str,
default => sub{'.add_route.output'},
);
has del_output_file => (
is => 'ro',
isa => Str,
default => sub{'.del_route.output'},
);
has jobs_folder_path => (
is => 'ro',
isa => Str,
default => sub{'queue_manager/jobs'},
);
has job_folders => (
is => 'ro',
isa => HashRef,
lazy => 1,
builder => '_build_job_folders',
);
has jobs => (
is => 'rw',
isa => ArrayRef,
lazy => 1,
default => sub{ [] },
clearer => '_clear_jobs',
);
has queue => (
is => 'rw',
isa => ArrayRef,
lazy => 1,
default => sub{ [] },
clearer => '_clear_queue',
);
has added_jobs => (
is => 'rw',
isa => ArrayRef[HashRef],
lazy => 1,
default => sub { [] },
clearer => '_clear_added_jobs',
);
has deleted_jobs => (
is => 'rw',
isa => ArrayRef[HashRef],
lazy => 1,
default => sub { [] },
clearer => '_clear_deleted_jobs',
);
has routing_table => (
is => 'rw',
isa => ArrayRef[HashRef],
lazy => 1,
builder => '_build_routing_table',
);
has email_contact => (
is => 'ro',
isa => Str,
default => sub{'some@user.com'},
);
has '+log_to_file' => ( default => sub{1} );
has '+log_path' => ( default => sub{'queue_manager/logs'} );
has '+log_file' => ( default => sub{'queue_manager.log'} );
has '+log_to_stdout' => ( default => sub{1} );
sub _build_job_folders {
my $self = shift;
my $jobs_folder_path = $self->jobs_folder_path;
my %ret_val = (
new => "$jobs_folder_path/new",
progress => "$jobs_folder_path/progress",
failed => "$jobs_folder_path/failed",
);
return \%ret_val;
}
sub _build_routing_table {
my $self = shift;
my @ret_val;
my @routing_table = `ip route show`;
foreach my $line (@routing_table) {
# 1.1.1.1 via 2.2.2.2 dev eth0 proto baba
my ($ip_address, $next_hop) = $line =~ /^($RE{net}{IPv4}) via
($RE{net}{IPv4}) .*proto zebra\s+$/;
if (defined ($ip_address) and defined ($next_hop)) {
push @ret_val, { ip_address => $ip_address, next_hop =>
$next_hop };
}
}
return \@ret_val;
}
sub run {
my $self = shift;
my %job_folders = %{ $self->job_folders };
$self->log("Queue Manager is running...");
$self->set_failed_timer(
AnyEvent->timer(
interval => $self->failed_interval,
cb => sub {
$self->post_process_failed_jobs();
},
)
);
$self->set_notifier(
AnyEvent::Filesys::Notify->new(
dirs => [ $job_folders{'new'} ],
interval => $self->notifier_interval,
cb => sub {
my (@events) = @_;
for my $event (@events) {
if ($event->is_created and basename($event->path) !~
/^\./) {
push @{ $self->jobs }, $event->path;
}
}
my $queue_timer; $queue_timer = AE::timer
$self->notifier_interval, 0, sub {
$self->process_new_jobs();
undef $queue_timer;
};
},
)
);
}
sub process_new_jobs {
my $self = shift;
my $progress_path;
my %job_folders = %{ $self->job_folders };
my @new_jobs = @{ $self->jobs };
foreach my $new_job (@new_jobs) {
my $file_name = basename($new_job);
$progress_path = "$job_folders{'progress'}/$file_name";
move($new_job, $progress_path);
$self->enter_queue($progress_path);
}
$self->_clear_jobs;
$self->start_new_jobs();
}
sub post_process_failed_jobs {
my $self = shift;
my %job_folders = %{ $self->job_folders };
my @failed_files = read_dir($job_folders{'failed'});
if (@failed_files) {
foreach my $failed_file (@failed_files) {
my $file_stat = stat("$job_folders{'failed'}/$failed_file");
my $result = time - $file_stat->mtime;
$self->log("Modified " . int($result/60) . " mins ago");
}
}
}
sub enter_queue {
my $self = shift;
my ($job_to_process) = shift;
my $file_name = basename($job_to_process);
my $args = read_file($job_to_process);
chomp $args;
for ($job_to_process) {
when (/add/) {
my ($ip_address, $next_hop) = split(/ /, $args);
push @{ $self->added_jobs }, {
ip_address => $ip_address,
next_hop => $next_hop,
file_name => $file_name,
};
push @{ $self->queue }, "add:$ip_address:$next_hop";
}
when (/del/) {
my ($ip_address) = $args;
push @{ $self->deleted_jobs }, {
ip_address => $ip_address,
file_name => $file_name,
};
push @{ $self->queue }, "del:$ip_address";
}
}
}
sub start_new_jobs {
my $self = shift;
my $iterator = natatime $self->number_of_jobs_to_process, @{
$self->queue };
while ( my @values = $iterator->() ) {
my $arguments_line = join(' ', @values);
$self->log("Processing: $arguments_line");
}
$self->_clear_queue;
$self->jobs_post_process();
}
sub jobs_post_process {
my $self = shift;
my %job_folders = %{ $self->job_folders };
p $self->added_jobs;
foreach my $added_job ( @{ $self->added_jobs } ) {
my ($ip_address, $next_hop, $file_name) =
($added_job->{'ip_address'},
$added_job->{'next_hop'},
$added_job->{'file_name'});
my $is_in_routing_table =
$self->check_is_in_routing_table($ip_address);
if (not $is_in_routing_table) {
$self->log("FAILED job: $file_name");
move("$job_folders{'progress'}/$file_name",
"$job_folders{'failed'}/$file_name");
$self->send_email($self->add_output_file, $added_job);
}
else {
$self->log("Finished processing job: $file_name");
remove("$job_folders{'progress'}/$file_name");
}
}
$self->_clear_added_jobs;
foreach my $deleted_job ( @{ $self->deleted_jobs } ) {
my ($ip_address, $file_name) =
($deleted_job->{'ip_address'},
$deleted_job->{'file_name'});
my $is_in_routing_table =
$self->check_is_in_routing_table($ip_address);
if ($is_in_routing_table) {
$self->log("FAILED job: $file_name");
move("$job_folders{'progress'}/$file_name",
"$job_folders{'failed'}/$file_name");
$self->send_email($self->del_output_file, $deleted_job);
}
else {
$self->log("Finished processing job: $file_name");
remove("$job_folders{'progress'}/$file_name");
}
}
$self->_clear_deleted_jobs;
}
sub check_is_in_routing_table {
my $self = shift;
my ($ip_address) = shift;
$self->routing_table($self->get_routing_table);
my @addresses = @{ $self->routing_table };
my ($comparable_ip) = $ip_address =~ /($RE{net}{IPv4})\/32$/;
my $ret_val = first { $_->{'ip_address'} eq $comparable_ip } @addresses;
return $ret_val;
}
sub get_routing_table {
my $self = shift;
my @routing_table = `ip route show`;
my @ret_val;
foreach my $line (@routing_table) {
# 1.1.1.1 via 2.2.2.2 dev eth0 proto baba
my ($ip_address, $next_hop) = $line =~ /^($RE{net}{IPv4}) via
($RE{net}{IPv4}) .*proto zebra\s+$/;
if (defined ($ip_address) and defined ($next_hop)) {
push @ret_val, { ip_address => $ip_address, next_hop =>
$next_hop };
}
}
return @ret_val;
}
1;
Now, my problem is that failed_timer runs only once, when the program
starts, and that's it... What have I done wrong?
No comments:
Post a Comment