#!/usr/bin/php
<?php
// ============================================================
// stats_direction_01 업데이트 데몬 (완전 수정 버전)
// - 트랜잭션 기반 안전한 처리
// - 마지막 처리 ID 영속 저장 (재시작 시 이어서 처리)
// - 부분 실패 시 안전한 롤백 및 재시도
// ============================================================
set_time_limit(0);
ini_set('memory_limit', '-1');
date_default_timezone_set('Asia/Seoul');
if (php_sapi_name() !== 'cli') {
fwrite(STDERR, "[ERROR] CLI only\n");
exit(1);
}
$DAEMON_ID = pathinfo(__FILE__, PATHINFO_FILENAME);
$PID = getmypid();
$last_heartbeat_time = 0;
$keepRunning = true;
if (function_exists('pcntl_signal') && defined('SIGTERM') && defined('SIGINT')) {
pcntl_signal(SIGTERM, function() use (&$keepRunning) { $keepRunning = false; });
pcntl_signal(SIGINT, function() use (&$keepRunning) { $keepRunning = false; });
}
// ============================================================
// 로깅
// ============================================================
function log_message($msg, $is_error = false) {
$timestamp = date('Y-m-d H:i:s');
$output = "[{$timestamp}] {$msg}\n";
if ($is_error) {
fwrite(STDERR, $output);
} else {
echo $output;
}
flush();
}
// ============================================================
// 데몬 상태 기록 (d_last_id 포함)
// ============================================================
function update_daemon_status($db, $id, $pid, $status, $last_id = null) {
try {
$sql = "INSERT INTO upbit_data.daemon_record
(d_id, d_category, d_pid, d_status, d_heartbeat, d_start_time, d_last_id)
VALUES (:id, 'STATS_DIRECTION_01', :pid, :status, NOW(), NOW(), :last_id)
ON DUPLICATE KEY UPDATE
d_pid = :pid2,
d_status = :status2,
d_heartbeat = NOW(),
d_last_id = COALESCE(:last_id2, d_last_id)";
$stmt = $db->prepare($sql);
$stmt->execute([
':id' => $id,
':pid' => $pid,
':pid2' => $pid,
':status' => $status,
':status2' => $status,
':last_id' => $last_id,
':last_id2' => $last_id
]);
} catch (Exception $e) {
log_message("Daemon Record 업데이트 실패: " . $e->getMessage(), true);
}
}
// ============================================================
// 마지막 처리 ID 조회
// ============================================================
function get_last_processed_id($db, $daemon_id) {
try {
$stmt = $db->prepare("SELECT d_last_id FROM upbit_data.daemon_record WHERE d_id = ?");
$stmt->execute([$daemon_id]);
$result = $stmt->fetchColumn();
return $result ? (int)$result : 0;
} catch (Exception $e) {
log_message("Last ID 조회 실패: " . $e->getMessage(), true);
return 0;
}
}
// ============================================================
// DB 연결
// ============================================================
function loadDBConnection() {
if (file_exists('/home/www/DB/db_upbit.php')) {
include '/home/www/DB/db_upbit.php';
$db = null;
if (isset($db_upbit) && $db_upbit instanceof PDO) {
$db = $db_upbit;
} elseif (isset($pdo) && $pdo instanceof PDO) {
$db = $pdo;
}
if ($db === null) return null;
$db->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$db->setAttribute(PDO::ATTR_DEFAULT_FETCH_MODE, PDO::FETCH_ASSOC);
return $db;
}
return null;
}
// ============================================================
// 메인 루프
// ============================================================
$db_upbit = null;
$last_processed_id = 0;
$is_initialized = false;
log_message("데몬 시작 (ID: {$DAEMON_ID}, PID: {$PID})");
while ($keepRunning) {
try {
// DB 연결
if ($db_upbit === null) {
$db_upbit = loadDBConnection();
if ($db_upbit === null) {
log_message("DB 연결 실패, 5초 후 재시도", true);
sleep(5);
continue;
}
$is_initialized = false;
log_message("DB 연결 성공");
}
// 하트비트 (1초 주기)
if (time() - $last_heartbeat_time >= 1) {
update_daemon_status($db_upbit, $DAEMON_ID, $PID, 'RUNNING', $last_processed_id);
$last_heartbeat_time = time();
}
// 초기화: 마지막 처리 ID 복구
if (!$is_initialized) {
$last_processed_id = get_last_processed_id($db_upbit, $DAEMON_ID);
$is_initialized = true;
log_message("마지막 처리 ID 복구: {$last_processed_id}");
}
// 대상 마켓 로드
$target_markets = $db_upbit->query("SELECT market FROM stats_direction_01")
->fetchAll(PDO::FETCH_COLUMN);
if (empty($target_markets)) {
usleep(500000);
continue;
}
// 원장 데이터 조회 및 처리
$processed_count = 0;
while (true) {
$placeholders = implode(',', array_fill(0, count($target_markets), '?'));
// [수정된 부분] 플랫폼(1초봉) 테이블의 고유키인 id로 정확히 타격
$sql = "SELECT id, market, trade_price, change_rate, tr_ask_bid, tr_trade_volume
FROM daemon_upbit_coin_1s
WHERE id > ? AND market IN ($placeholders)
ORDER BY id ASC
LIMIT 100";
$stmt = $db_upbit->prepare($sql);
$stmt->execute(array_merge([$last_processed_id], $target_markets));
$rows = $stmt->fetchAll(PDO::FETCH_ASSOC);
if (empty($rows)) break;
// 각 레코드를 개별 트랜잭션으로 처리
foreach ($rows as $row) {
// [수정된 부분] 받아오는 배열 키값도 id로 수정
$current_id = $row['id'];
$market = $row['market'];
try {
$db_upbit->beginTransaction();
// 1. 현재 통계 조회 (FOR UPDATE)
$s_sql = "SELECT change_rate, chg_dir_sum, chg_up_streak, chg_down_streak,
chg_up_streak_max, chg_down_streak_max, chg_rate_avg, chg_rate_avg_cnt,
side_plus, side_minus, side_total, side_total_cnt,
acc_bid_volume, acc_ask_volume, side_avg
FROM stats_direction_01
WHERE market = ?
FOR UPDATE";
$s_stmt = $db_upbit->prepare($s_sql);
$s_stmt->execute([$market]);
$old = $s_stmt->fetch();
if (!$old) {
$db_upbit->rollBack();
log_message("마켓 {$market} 통계 레코드 없음, 스킵 (ID: {$current_id})", true);
$last_processed_id = $current_id; // 스킵한 ID도 진행
continue;
}
// 2. 계산
$price = (double)$row['trade_price'];
$rate_now = (double)$row['change_rate'];
$side = $row['tr_ask_bid'];
$dir = ($side === 'BID') ? 1 : -1;
$vol = (double)$row['tr_trade_volume'];
$old_rate = (double)($old['change_rate'] ?? 0);
// 등락률 통계
$chg_dir = ($rate_now > $old_rate ? 1 : ($rate_now < $old_rate ? -1 : 0));
$chg_dir_sum = (int)($old['chg_dir_sum'] ?? 0) + $chg_dir;
$chg_up = ($chg_dir == 1 ? (int)($old['chg_up_streak'] ?? 0) + 1 : 0);
$chg_down = ($chg_dir == -1 ? (int)($old['chg_down_streak'] ?? 0) + 1 : 0);
$chg_up_max = max((int)($old['chg_up_streak_max'] ?? 0), $chg_up);
$chg_down_max = max((int)($old['chg_down_streak_max'] ?? 0), $chg_down);
$old_chg_avg_cnt = (int)($old['chg_rate_avg_cnt'] ?? 0);
$new_chg_avg_cnt = $old_chg_avg_cnt + 1;
$old_chg_avg = (double)($old['chg_rate_avg'] ?? $rate_now);
$new_chg_avg = (($old_chg_avg * $old_chg_avg_cnt) + $rate_now) / $new_chg_avg_cnt;
// 방향 및 거래량 통계
$side_plus = (int)($old['side_plus'] ?? 0) + ($dir == 1 ? 1 : 0);
$side_minus = (int)($old['side_minus'] ?? 0) + ($dir == -1 ? 1 : 0);
$side_total = (int)($old['side_total'] ?? 0) + $dir;
$old_side_total_cnt = (int)($old['side_total_cnt'] ?? 0);
$new_side_total_cnt = $old_side_total_cnt + 1;
$old_side_avg = (double)($old['side_avg'] ?? $price);
$new_side_avg = (($old_side_avg * $old_side_total_cnt) + $price) / $new_side_total_cnt;
$acc_bid_vol = (double)($old['acc_bid_volume'] ?? 0) + ($dir == 1 ? $vol : 0);
$acc_ask_vol = (double)($old['acc_ask_volume'] ?? 0) + ($dir == -1 ? $vol : 0);
$net_vol = $acc_bid_vol - $acc_ask_vol;
// 3. 업데이트 (여기는 stats_direction_01에 맞게 wr_datetime 기록)
$u_sql = "UPDATE stats_direction_01 SET
trade_price = :price,
change_rate = :rate_now,
side_plus = :side_plus,
side_minus = :side_minus,
side_total = :side_total,
side_plus_cnt = :plus_display,
side_minus_cnt = :minus_display,
side_avg = :side_avg,
side_total_cnt = :side_total_cnt,
acc_bid_volume = :bid_vol,
acc_ask_volume = :ask_vol,
net_vol = :net_vol,
chg_dir = :chg_dir,
chg_dir_sum = :chg_dir_sum,
chg_up_streak = :chg_up,
chg_down_streak = :chg_down,
chg_up_streak_max = :chg_up_max,
chg_down_streak_max = :chg_down_max,
chg_rate_avg = :chg_avg,
chg_rate_avg_cnt = :chg_avg_cnt,
wr_datetime = NOW()
WHERE market = :market";
$u_stmt = $db_upbit->prepare($u_sql);
$u_stmt->execute([
':price' => $price,
':rate_now' => $rate_now,
':side_plus' => $side_plus,
':side_minus' => $side_minus,
':side_total' => $side_total,
':plus_display' => ($dir == 1 ? 1 : 0),
':minus_display' => ($dir == -1 ? -1 : 0),
':side_avg' => $new_side_avg,
':side_total_cnt'=> $new_side_total_cnt,
':bid_vol' => $acc_bid_vol,
':ask_vol' => $acc_ask_vol,
':net_vol' => $net_vol,
':chg_dir' => $chg_dir,
':chg_dir_sum' => $chg_dir_sum,
':chg_up' => $chg_up,
':chg_down' => $chg_down,
':chg_up_max' => $chg_up_max,
':chg_down_max' => $chg_down_max,
':chg_avg' => $new_chg_avg,
':chg_avg_cnt' => $new_chg_avg_cnt,
':market' => $market
]);
$db_upbit->commit();
// 커밋 성공 시에만 ID 업데이트
$last_processed_id = $current_id;
$processed_count++;
} catch (Exception $e) {
if ($db_upbit->inTransaction()) {
$db_upbit->rollBack();
}
log_message("처리 실패 (ID: {$current_id}, Market: {$market}): " . $e->getMessage(), true);
// 실패한 레코드는 건너뛰고 다음으로 진행 (무한 루프 방지)
$last_processed_id = $current_id;
}
if (function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
if (!$keepRunning) break;
}
// 처리 결과 로그
if ($processed_count > 0) {
log_message("배치 처리 완료: {$processed_count}건, 마지막 ID: {$last_processed_id}");
$processed_count = 0;
// 마지막 ID 즉시 저장
update_daemon_status($db_upbit, $DAEMON_ID, $PID, 'RUNNING', $last_processed_id);
}
if (count($rows) < 100) break;
if (!$keepRunning) break;
}
usleep(500000); // 0.5초 대기
} catch (PDOException $e) {
log_message("DB 에러: " . $e->getMessage(), true);
$db_upbit = null;
sleep(5);
} catch (Exception $e) {
log_message("일반 에러: " . $e->getMessage(), true);
sleep(1);
}
if (function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
}
// 종료 처리
if ($db_upbit !== null) {
update_daemon_status($db_upbit, $DAEMON_ID, $PID, 'STOPPED', $last_processed_id);
}
log_message("데몬 종료 (ID: {$DAEMON_ID}, 마지막 처리 ID: {$last_processed_id})");