Skip to content

Instantly share code, notes, and snippets.

@pcdinh
Created March 29, 2010 07:27
Show Gist options
  • Select an option

  • Save pcdinh/347554 to your computer and use it in GitHub Desktop.

Select an option

Save pcdinh/347554 to your computer and use it in GitHub Desktop.
#!/usr/local/php/bin/php
<?php
/**
* @example Run from terminal
* root@wvb: ~: ./run_pricedata.php enable enable 2007/12/21
* root@wvb: ~: ./run_pricedata.php disable enable 2007/12/21
*/
$baseDirectory = getcwd();
ini_set('log_errors', 1);
ini_set('error_log', $baseDirectory.DIRECTORY_SEPARATOR.'logs/error.'.date('Ymd').'.log');
set_time_limit(0);
date_default_timezone_set('Asia/Tokyo');
function __autoload($class)
{
require_once getcwd().DIRECTORY_SEPARATOR.'classes/'.$class.'.php';
}
// Enable the online data mining
$enableHttpFetching = true;
// Enable offline data processing
$enableDatabaseUpdate = true;
// Save each ticker pricing data as plain text line or serialized
$enableSerializedSavingResult = true;
$moduleStatFile = 'run_pricedata_moduletimecounter.txt';
$appStatFile = 'run_pricedata_finaltimecounter.txt';
set_error_handler(array('ErrorHandler', 'onError'));
$appStart = time();
$startDateTime = date('Y.m.d H:i:s');
file_put_contents($appStatFile, "------ \n Application get started at $startDateTime \n", FILE_APPEND);
file_put_contents($moduleStatFile, "------ \n Application get started at $startDateTime \n", FILE_APPEND);
echo "Application get started at $startDateTime \n";
try
{
// Command line arguments
$cmd = new GetOptController();
$cmd->run();
$date = $cmd->getOption(3);
$priceToFetch = $cmd->getOption(1);
$dbUpdate = $cmd->getOption(2);
if (null === $priceToFetch)
{
$enableHttpFetching = true;
}
else
{
$enableHttpFetching = ('enable' == $priceToFetch)?true:false;
}
if (null === $dbUpdate)
{
$enableDatabaseUpdate = true;
}
else
{
$enableDatabaseUpdate = ('enable' == $dbUpdate)?true:false;
}
// No command line argument - use default
if (null === $date)
{
$date = date('Y/m/d');
echo "Default date is used: $date\n";
}
elseif (3 !== count(explode('/', $date)) || strlen($date) !== 10)
{
$message = 'Command line argument is not correct. Expected: YYYY/MM/DD. Actual: '.$date."\n";
echo $message;
throw new Exception($message);
}
// Print date to fetch
echo "Start to fetch data for date: $date \n";
echo "Latest data retrieval: ".(true === $enableHttpFetching)?"Enabled\n":"Disabled\n";
echo "Database update: ".(true === $enableDatabaseUpdate)?"Enabled\n":"Disabled\n";
// Declare file name that hold the ticker data
$fileName = 'previous_checked_tickers';
// HTTP Content Retriever
$reader = UrlReader_File::getInstance();
// ticker/price fetching engine
$fetcher = new StockPriceFetcher($reader);
// Http-fetched data/price temporary storage
$persister = new StockPricePersister_File();
// The controller of the pool of tickers (corporate only)
$master = TickerMaster::getInstance($fileName.'.txt');
$market = new MarketIndexModel();
// All market indices values (not coporate tickers but can be used in the same manner)
$marketIndices = $market->findAll();
if (true === $enableHttpFetching)
{
// Break the pool of ticker into chunks
$master->load();
$master->addMarketIndex($marketIndices);
$master->setSegmentSize(100);
$dataChunks = $master->breakDown();
// How many chunk do we have
$chunkCount = count($dataChunks);
echo "Total page to fetch: {$master->getTotalTicker()} \n";
$counter = array();
// Start time of the operation (for benchmarking only)
$start = time();
// Assign each chunk to each child process
for ($i = 0; $i < $chunkCount; $i++)
{
// List of child processes
$children = array();
// Create child process
$pid = pcntl_fork();
// Check child process status
if (-1 === $pid)
{
throw new Exception("Could not fork the process");
}
elseif (0 === $pid)
{
// This part is only executed in the child
$childProcessId = posix_getpid();
echo "Child is running: $childProcessId. \n";
$persister->setFile(dirname(__FILE__).'/results/'.$fileName.$i.'.txt');
// Iterator for this chunk of tickers
$iterator = new TickerIterator($dataChunks[$i]);
$startTime = time();
if (false === isset($counter[$childProcessId]))
{
$counter[$childProcessId] = 0;
}
// Iterate through the chunk of ticker
for ($iterator->rewind(); $iterator->valid(); $iterator->next())
{
try
{
// Current ticker
$tickerRelated = $iterator->current();
$ticker = $iterator->key();
$fetcher->setTicker($ticker);
$wvbSecurityId = $tickerRelated['sec_perm_id'];
$fetcher->setSecurityPermId($wvbSecurityId);
$fetcher->setProcessId($childProcessId);
// make connection to the http resource and extract the pricing data
$fetcher->run();
// retrieve the result as an array
$data = $fetcher->getData();
// Notify the process completed for this ticker
$fetcher->report();
// Clean up internal data
$fetcher->cleanUp();
// Check if there is any data for this certain date
if (true === isset($data[$date]))
{
echo "Child process $childProcessId will save price data for ticker $ticker \n";
// persist this data for later use
if (false === $enableSerializedSavingResult)
{
$persister->saveData($data[$date], $date);
}
else
{
// If we want to save the serialized data instead of text lines
$persister->append($data[$date], $date);
}
$counter[$childProcessId]++;
}
else
{
$persister->saveRawData($data, $ticker);
throw new DateNonExistentException("This market date does not exist in the stock data table");
}
}
catch (DateNonExistentException $ex)
{
error_log("Exception: ".$ex->getMessage().":$ticker\n");
}
catch (Exception $ex)
{
error_log("Exception: ".$ex->getMessage().".\n Trace: ".$ex->getTraceAsString());
}
} // end for
// Exit current child when its task is completed
echo "\nChild process: $childProcessId has done its job in ".(time() - $startTime)."(s). Total tickers fetched: ".$counter[$childProcessId]."\n";
// If we want to save the serialized data instead of text lines
if (true === $enableSerializedSavingResult)
{
try
{
$persister->serializeAndSave();
}
catch (Exception $ex)
{
echo "Error occur when saving the data retrieved by child process $childProcessId: ".$ex->getMessage()."\n";
}
}
exit($i);
}
else
{
// This part is only executed in the parent
// The PID of the child process is returned in the parent's thread of execution
// Push the PID of the created child into
$children[] = $pid;
echo "\nParent process: ".posix_getppid()."\n";
} // end if check pid
} // end for
} // end if $enableHttpFetching
if (true === $enableHttpFetching)
{
// Parent process
if ($pid)
{
// $i means the order of each process (see above for loop)
// Also, $i + 1 is the number of the processes forked
while ($i > 0)
{
// The wait function suspends execution of the current process
// until a child has exited, or until a signal is delivered whose action is
// to terminate the current process or to call a signal handling function
// pcntl_wait() returns the process ID of the child which exited,
// -1 on error or zero if WNOHANG was provided
// as an option (on wait3-available systems) and no child was available.
// When children die, this gets rid of the zombies
pcntl_waitpid(-1, $status);
$val = pcntl_wexitstatus($status);
echo "\nParent process: Child no. $val now returns. Continue to wait for ".($i - 1)." children\n";
--$i;
}
unset($dataChunks);
$elapsed = time() - $start;
echo "\nAll child processes are completed in $elapsed (s)\n";
file_put_contents($moduleStatFile, date('Y-m-d H:i:s').": $elapsed \n", FILE_APPEND);
}
/**
* Update the processed data into the database
*
*/
if (true === $enableDatabaseUpdate)
{
echo "Start to update the database after data http fetching is completed \n";
$start = time();
$controller = new StockPricePersister_DatabaseController($date);
$controller->run($enableSerializedSavingResult);
$elapsed = time() - $start;
file_put_contents($moduleStatFile, date('Y-m-d H:i:s').": $elapsed \n", FILE_APPEND);
}
}
else
{
/**
* Update the processed data into the database
*
*/
if (true === $enableDatabaseUpdate)
{
echo "Start to update the database using offline data. \n";
$start = time();
$controller = new StockPricePersister_DatabaseController($date);
$controller->run($enableSerializedSavingResult);
$elapsed = time() - $start;
file_put_contents($moduleStatFile, date('Y-m-d H:i:s').": $elapsed \n", FILE_APPEND);
}
}
$elapsed = time() - $appStart;
echo "Elapsed time: $elapsed (s)\n";
file_put_contents($appStatFile, date('Y-m-d H:i:s').": ".$elapsed."\n", FILE_APPEND);
}
catch (Exception $ex)
{
error_log("Exception: ".$ex->getMessage().".\n Trace: ".$ex->getTraceAsString());
echo $ex->getMessage()."\n";
}
?>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment