CORE TERMINAL
올드보이 & 마리아 백업

업비트 API 플랫폼 실시간 수집 데몬 - 업그레이드 버전 Ver.2.0

DATE: 2026-03-03 11:06
분류 정보
핵심 항목
내용
* 업비트 API 플랫폼 실시간 수집 데몬 - 업그레이드 버전 Ver.2.0
1. 기존 코드에 속도 암세포 코드 제거
2. 병렬식 심볼 요청구조
추가 내용
#!/usr/bin/php
<?php
/**
 * ============================================================
 * 업비트 전체 시세 수집 CLI 데몬
 * - curl_multi 병렬 호출 방식
 * - gnu DB 루프 밖 1회 연결
 * - bulk INSERT 한방 쿼리
 * - echo/flush 전부 제거
 * ============================================================
 */

error_reporting(E_ALL);
ini_set(\\\'display_errors\\\', 1);
date_default_timezone_set(\\\'Asia/Seoul\\\');

$DAEMON_ID = pathinfo(__FILE__, PATHINFO_FILENAME);

if (php_sapi_name() !== \\\'cli\\\') {
    exit;
}

function get_db_connection() {
    try {
        $db_upbit = null;
        @include \\\'/home/www/DB/db_upbit.php\\\';
        if (!($db_upbit instanceof PDO)) return null;
        $pdo = $db_upbit;
        $pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
        return $pdo;
    } catch (Throwable $e) {
        return null;
    }
}

function get_gnu_connection() {
    try {
        $db_gnu  = null;
        $pdo_gnu = null;
        $pdo     = null;
        @include \\\'/home/www/DB/db_gnu.php\\\';
        if ($db_gnu instanceof PDO) {
            $db_gnu->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
            return $db_gnu;
        }
        if ($pdo_gnu instanceof PDO) {
            $pdo_gnu->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
            return $pdo_gnu;
        }
        if ($pdo instanceof PDO) {
            $pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
            return $pdo;
        }
        return null;
    } catch (Throwable $e) {
        return null;
    }
}

// ============================================================
// curl_multi 병렬 호출
// ============================================================
function http_multi_get(array $urls): array {
    $mh      = curl_multi_init();
    $handles = [];

    foreach ($urls as $key => $url) {
        $ch = curl_init($url);
        curl_setopt_array($ch, [
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_TIMEOUT        => 8,
            CURLOPT_USERAGENT      => \\\'upbit-ghost\\\',
            CURLOPT_SSL_VERIFYPEER => false,
        ]);
        curl_multi_add_handle($mh, $ch);
        $handles[$key] = $ch;
    }

    $running = null;
    do {
        curl_multi_exec($mh, $running);
        curl_multi_select($mh);
    } while ($running > 0);

    $results = [];
    foreach ($handles as $key => $ch) {
        $raw = curl_multi_getcontent($ch);
        $results[$key] = $raw ? json_decode($raw, true) : null;
        curl_multi_remove_handle($mh, $ch);
        curl_close($ch);
    }
    curl_multi_close($mh);

    return $results;
}

$api = require \\\'/home/www/DB/upbit_api_url.php\\\';
$API_TICKER    = $api[\\\'ticker\\\'] . \\\'?markets=\\\';
$API_ORDERBOOK = \\\'https://api.upbit.com/v1/orderbook?markets=\\\';
$API_TRADES    = \\\'https://api.upbit.com/v1/trades/ticks?count=1&market=\\\';

$pdo     = get_db_connection();
$pdo_gnu = get_gnu_connection();
$server_ip = \\\'CLI_DAEMON\\\';

$cycle_count         = 0;
$last_market_refresh = 0;
$krw                 = [];

$stmt_hb   = null;
$stmt_kill = null;
$stmt_stop = null;
$stmt_best = null;

while (true) {
    $cycle_count++;

    try {
        $reconnected = false;
        if (!$pdo) {
            $pdo = get_db_connection();
            $reconnected = true;
        } else {
            try { $pdo->query(\\\"SELECT 1\\\"); }
            catch (Throwable $e) {
                $pdo = get_db_connection();
                $reconnected = true;
            }
        }
        if ($reconnected) {
            $stmt_hb = $stmt_kill = $stmt_stop = $stmt_best = null;
        }

        if ($pdo) {
            if (!$stmt_hb) {
                $stmt_hb = $pdo->prepare(\\\"
                    INSERT INTO daemon_record (
                        d_id, d_category, d_pid, d_status,
                        d_heartbeat, d_ip, d_start_time, d_memo, d_kill_flag
                    )
                    VALUES (
                        :id, \\\'UPBIT\\\', :pid, \\\'RUNNING\\\',
                        NOW(), :ip, NOW(), \\\'UPBIT BOARD ONLY TICKER GHOST\\\', 0
                    )
                    ON DUPLICATE KEY UPDATE
                        d_pid       = VALUES(d_pid),
                        d_status    = \\\'RUNNING\\\',
                        d_heartbeat = NOW(),
                        d_ip        = VALUES(d_ip),
                        d_memo      = VALUES(d_memo)
                \\\");
            }
            if (!$stmt_kill) {
                $stmt_kill = $pdo->prepare(\\\"SELECT d_kill_flag FROM daemon_record WHERE d_id = :id LIMIT 1\\\");
            }
            if (!$stmt_stop) {
                $stmt_stop = $pdo->prepare(\\\"
                    UPDATE daemon_record
                    SET d_status=\\\'STOPPED\\\', d_heartbeat=NOW(), d_pid=0
                    WHERE d_id=:id
                \\\");
            }

            $stmt_hb->execute([\\\':id\\\' => $DAEMON_ID, \\\':pid\\\' => getmypid(), \\\':ip\\\' => $server_ip]);

            $stmt_kill->execute([\\\':id\\\' => $DAEMON_ID]);
            $kill_flag = (int)($stmt_kill->fetchColumn() ?: 0);
            if ($kill_flag === 1) {
                $stmt_stop->execute([\\\':id\\\' => $DAEMON_ID]);
                exit(0);
            }

            // 종목 갱신 (1초마다)
            if (time() - $last_market_refresh >= 1 || empty($krw)) {
                $tmp = [];
                if (!$pdo_gnu) {
                    $pdo_gnu   = get_gnu_connection();
                    $stmt_best = null;
                } else {
                    try { $pdo_gnu->query(\\\"SELECT 1\\\"); }
                    catch (Throwable $e) {
                        $pdo_gnu   = get_gnu_connection();
                        $stmt_best = null;
                    }
                }
                if ($pdo_gnu instanceof PDO) {
                    if (!$stmt_best) {
                        $stmt_best = $pdo_gnu->prepare(\\\"
                            SELECT wr_subject FROM g5_write_daemon_kind_upbit
                            WHERE (x2_run = 1 OR x2_run = \\\'1\\\')
                        \\\");
                    }
                    $stmt_best->execute();
                    $best_rows = $stmt_best->fetchAll(PDO::FETCH_COLUMN);
                    if (is_array($best_rows)) {
                        foreach ($best_rows as $sym) {
                            $sym = strtoupper(trim((string)$sym));
                            if ($sym === \\\'\\\') continue;
                            if (strpos($sym, \\\'-\\\') === false) {
                                $sym = \\\'KRW-\\\' . $sym;
                            }
                            if (strpos($sym, \\\'KRW-\\\') !== 0) continue;
                            if (!in_array($sym, $tmp, true)) $tmp[] = $sym;
                        }
                    }
                }
                if ($tmp) {
                    $krw = $tmp;
                    $last_market_refresh = time();
                } else {
                    $krw = [];
                    sleep(10);
                    continue;
                }
            }

            if (!$krw) {
                sleep(10);
                continue;
            }

            $at = date(\\\'Y-m-d H:i:s\\\');
            $ms = (int)(microtime(true) * 1000);

            // 티커 + 호가 동시 호출, trades는 종목별 개별 URL
            $market_str = implode(\\\',\\\', $krw);
            $multi_urls = [
                \\\'ticker\\\'    => $API_TICKER    . $market_str,
                \\\'orderbook\\\' => $API_ORDERBOOK . $market_str,
            ];
            foreach ($krw as $sym) {
                $multi_urls[\\\"trades__{$sym}\\\"] = $API_TRADES . urlencode($sym);
            }
            $raw = http_multi_get($multi_urls);

            $tks = $raw[\\\'ticker\\\'];
            $obs = $raw[\\\'orderbook\\\'];

            // trades 맵 구성
            $tr_map = [];
            foreach ($krw as $sym) {
                $tr_res = $raw[\\\"trades__{$sym}\\\"] ?? null;
                if (is_array($tr_res) && isset($tr_res[0])) {
                    $tr_map[$sym] = $tr_res[0];
                }
            }

            if (!$tks || !is_array($tks)) {
                sleep(3);
                continue;
            }

            // 호가 맵 구성
            $ob_map = [];
            if (is_array($obs)) {
                foreach ($obs as $o) {
                    if (isset($o[\\\'market\\\'])) {
                        $ob_map[$o[\\\'market\\\']] = $o;
                    }
                }
            }

            // bulk INSERT
            $placeholders = [];
            $params       = [];
            foreach ($tks as $i => $t) {
                $market = $t[\\\'market\\\'] ?? \\\'\\\';
                if ($market === \\\'\\\') continue;

                $ob = $ob_map[$market] ?? [];
                $tr = $tr_map[$market] ?? [];

                $placeholders[] = \\\"(
                    :market_{$i}, :trade_date_{$i}, :trade_time_{$i}, :trade_date_kst_{$i}, :trade_time_kst_{$i},
                    :opening_price_{$i}, :high_price_{$i}, :low_price_{$i}, :trade_price_{$i}, :prev_closing_price_{$i},
                    :change_{$i}, :change_price_{$i}, :change_rate_{$i}, :signed_change_price_{$i}, :signed_change_rate_{$i},
                    :trade_volume_{$i}, :acc_trade_volume_{$i}, :acc_trade_volume_24h_{$i}, :acc_trade_price_{$i}, :acc_trade_price_24h_{$i},
                    :highest_52_week_price_{$i}, :highest_52_week_date_{$i}, :lowest_52_week_price_{$i}, :lowest_52_week_date_{$i},
                    :collected_at_{$i}, :collected_ms_{$i},
                    :ob_timestamp_{$i}, :ob_total_ask_size_{$i}, :ob_total_bid_size_{$i}, :ob_units_{$i},
                    :ob_collected_at_{$i}, :ob_collected_ms_{$i},
                    :tr_trade_timestamp_{$i}, :tr_trade_price_{$i}, :tr_trade_volume_{$i}, :tr_ask_bid_{$i},
                    :tr_trade_date_utc_{$i}, :tr_trade_time_utc_{$i}, :tr_trade_date_kst_{$i}, :tr_trade_time_kst_{$i},
                    :tr_collected_at_{$i}, :tr_collected_ms_{$i},
                    :day_of_week_{$i}, :korean_name_{$i}
                )\\\";

                $params[\\\"market_{$i}\\\"]                = $market;
                $params[\\\"trade_date_{$i}\\\"]            = $t[\\\'trade_date\\\']              ?? \\\'\\\';
                $params[\\\"trade_time_{$i}\\\"]            = $t[\\\'trade_time\\\']              ?? \\\'\\\';
                $params[\\\"trade_date_kst_{$i}\\\"]        = $t[\\\'trade_date_kst\\\']          ?? \\\'\\\';
                $params[\\\"trade_time_kst_{$i}\\\"]        = $t[\\\'trade_time_kst\\\']          ?? \\\'\\\';
                $params[\\\"opening_price_{$i}\\\"]         = $t[\\\'opening_price\\\']           ?? 0;
                $params[\\\"high_price_{$i}\\\"]            = $t[\\\'high_price\\\']              ?? 0;
                $params[\\\"low_price_{$i}\\\"]             = $t[\\\'low_price\\\']               ?? 0;
                $params[\\\"trade_price_{$i}\\\"]           = $t[\\\'trade_price\\\']             ?? 0;
                $params[\\\"prev_closing_price_{$i}\\\"]    = $t[\\\'prev_closing_price\\\']      ?? 0;
                $params[\\\"change_{$i}\\\"]                = $t[\\\'change\\\']                  ?? \\\'\\\';
                $params[\\\"change_price_{$i}\\\"]          = $t[\\\'change_price\\\']            ?? 0;
                $params[\\\"change_rate_{$i}\\\"]           = $t[\\\'change_rate\\\']             ?? 0;
                $params[\\\"signed_change_price_{$i}\\\"]   = $t[\\\'signed_change_price\\\']     ?? 0;
                $params[\\\"signed_change_rate_{$i}\\\"]    = $t[\\\'signed_change_rate\\\']      ?? 0;
                $params[\\\"trade_volume_{$i}\\\"]          = $t[\\\'trade_volume\\\']            ?? 0;
                $params[\\\"acc_trade_volume_{$i}\\\"]      = $t[\\\'acc_trade_volume\\\']        ?? 0;
                $params[\\\"acc_trade_volume_24h_{$i}\\\"]  = $t[\\\'acc_trade_volume_24h\\\']    ?? 0;
                $params[\\\"acc_trade_price_{$i}\\\"]       = $t[\\\'acc_trade_price\\\']         ?? 0;
                $params[\\\"acc_trade_price_24h_{$i}\\\"]   = $t[\\\'acc_trade_price_24h\\\']     ?? 0;
                $params[\\\"highest_52_week_price_{$i}\\\"] = $t[\\\'highest_52_week_price\\\']   ?? 0;
                $params[\\\"highest_52_week_date_{$i}\\\"]  = $t[\\\'highest_52_week_date\\\']    ?? \\\'\\\';
                $params[\\\"lowest_52_week_price_{$i}\\\"]  = $t[\\\'lowest_52_week_price\\\']    ?? 0;
                $params[\\\"lowest_52_week_date_{$i}\\\"]   = $t[\\\'lowest_52_week_date\\\']     ?? \\\'\\\';
                $params[\\\"collected_at_{$i}\\\"]          = $at;
                $params[\\\"collected_ms_{$i}\\\"]          = $ms;
                $params[\\\"tr_trade_timestamp_{$i}\\\"]    = $tr[\\\'timestamp\\\']         ?? 0;
                $params[\\\"tr_trade_price_{$i}\\\"]        = $tr[\\\'trade_price\\\']       ?? 0;
                $params[\\\"tr_trade_volume_{$i}\\\"]       = $tr[\\\'trade_volume\\\']      ?? 0;
                $params[\\\"tr_ask_bid_{$i}\\\"]            = $tr[\\\'ask_bid\\\']           ?? \\\'\\\';
                $params[\\\"tr_trade_date_utc_{$i}\\\"]     = $tr[\\\'trade_date_utc\\\']    ?? \\\'\\\';
                $params[\\\"tr_trade_time_utc_{$i}\\\"]     = $tr[\\\'trade_time_utc\\\']    ?? \\\'\\\';
                $params[\\\"tr_trade_date_kst_{$i}\\\"]     = $t[\\\'trade_date_kst\\\']     ?? \\\'\\\';
                $params[\\\"tr_trade_time_kst_{$i}\\\"]     = $t[\\\'trade_time_kst\\\']     ?? \\\'\\\';
                $params[\\\"tr_collected_at_{$i}\\\"]       = $at;
                $params[\\\"tr_collected_ms_{$i}\\\"]       = $ms;
                $params[\\\"ob_timestamp_{$i}\\\"]          = $ob[\\\'timestamp\\\']              ?? 0;
                $params[\\\"ob_total_ask_size_{$i}\\\"]     = $ob[\\\'total_ask_size\\\']         ?? 0;
                $params[\\\"ob_total_bid_size_{$i}\\\"]     = $ob[\\\'total_bid_size\\\']         ?? 0;
                $params[\\\"ob_units_{$i}\\\"]              = isset($ob[\\\'orderbook_units\\\'])
                    ? json_encode($ob[\\\'orderbook_units\\\'], JSON_UNESCAPED_UNICODE)
                    : null;
                $params[\\\"ob_collected_at_{$i}\\\"]       = $at;
                $params[\\\"ob_collected_ms_{$i}\\\"]       = $ms;
                $params[\\\"day_of_week_{$i}\\\"]           = (int)date(\\\'w\\\');
                $params[\\\"korean_name_{$i}\\\"]           = \\\'\\\';
            }

            if (!empty($placeholders)) {
                $sql = \\\"
                    INSERT INTO daemon_upbit_Ticker (
                        market, trade_date, trade_time, trade_date_kst, trade_time_kst,
                        opening_price, high_price, low_price, trade_price, prev_closing_price,
                        `change`, change_price, change_rate, signed_change_price, signed_change_rate,
                        trade_volume, acc_trade_volume, acc_trade_volume_24h, acc_trade_price, acc_trade_price_24h,
                        highest_52_week_price, highest_52_week_date, lowest_52_week_price, lowest_52_week_date,
                        collected_at, collected_ms,
                        ob_timestamp, ob_total_ask_size, ob_total_bid_size, ob_units,
                        ob_collected_at, ob_collected_ms,
                        tr_trade_timestamp, tr_trade_price, tr_trade_volume, tr_ask_bid,
                        tr_trade_date_utc, tr_trade_time_utc, tr_trade_date_kst, tr_trade_time_kst,
                        tr_collected_at, tr_collected_ms,
                        day_of_week, korean_name
                    ) VALUES \\\" . implode(\\\',\\\', $placeholders) . \\\"
                    ON DUPLICATE KEY UPDATE
                        trade_date            = VALUES(trade_date),
                        trade_time            = VALUES(trade_time),
                        trade_date_kst        = VALUES(trade_date_kst),
                        trade_time_kst        = VALUES(trade_time_kst),
                        opening_price         = VALUES(opening_price),
                        high_price            = VALUES(high_price),
                        low_price             = VALUES(low_price),
                        trade_price           = VALUES(trade_price),
                        prev_closing_price    = VALUES(prev_closing_price),
                        `change`              = VALUES(`change`),
                        change_price          = VALUES(change_price),
                        change_rate           = VALUES(change_rate),
                        signed_change_price   = VALUES(signed_change_price),
                        signed_change_rate    = VALUES(signed_change_rate),
                        trade_volume          = VALUES(trade_volume),
                        acc_trade_volume      = VALUES(acc_trade_volume),
                        acc_trade_volume_24h  = VALUES(acc_trade_volume_24h),
                        acc_trade_price       = VALUES(acc_trade_price),
                        acc_trade_price_24h   = VALUES(acc_trade_price_24h),
                        highest_52_week_price = VALUES(highest_52_week_price),
                        highest_52_week_date  = VALUES(highest_52_week_date),
                        lowest_52_week_price  = VALUES(lowest_52_week_price),
                        lowest_52_week_date   = VALUES(lowest_52_week_date),
                        collected_at          = VALUES(collected_at),
                        collected_ms          = VALUES(collected_ms),
                        tr_trade_timestamp    = VALUES(tr_trade_timestamp),
                        tr_trade_price        = VALUES(tr_trade_price),
                        tr_trade_volume       = VALUES(tr_trade_volume),
                        tr_ask_bid            = VALUES(tr_ask_bid),
                        tr_trade_date_utc     = VALUES(tr_trade_date_utc),
                        tr_trade_time_utc     = VALUES(tr_trade_time_utc),
                        tr_trade_date_kst     = VALUES(tr_trade_date_kst),
                        tr_trade_time_kst     = VALUES(tr_trade_time_kst),
                        tr_collected_at       = VALUES(tr_collected_at),
                        tr_collected_ms       = VALUES(tr_collected_ms),
                        ob_timestamp          = VALUES(ob_timestamp),
                        ob_total_ask_size     = VALUES(ob_total_ask_size),
                        ob_total_bid_size     = VALUES(ob_total_bid_size),
                        ob_units              = VALUES(ob_units),
                        ob_collected_at       = VALUES(ob_collected_at),
                        ob_collected_ms       = VALUES(ob_collected_ms),
                        day_of_week           = VALUES(day_of_week),
                        korean_name           = VALUES(korean_name)
                \\\";
                $stmt = $pdo->prepare($sql);
                $stmt->execute($params);
            }

        } else {
            sleep(5);
        }

        if ($cycle_count % 50 === 0 && function_exists(\\\'gc_collect_cycles\\\')) {
            gc_collect_cycles();
        }

    } catch (Throwable $e) {
        sleep(3);
    }

    sleep(30);
}
최근 "데몬" 데이터
# 업비트 # API # 플랫폼 # 실시간 # 수집 데몬 # 업그레이드 # 버전 # Ver.2