diff --git a/cln-rpc/examples/getinfo.rs b/cln-rpc/examples/getinfo.rs index 2ca02e76b..a6dd27989 100644 --- a/cln-rpc/examples/getinfo.rs +++ b/cln-rpc/examples/getinfo.rs @@ -13,6 +13,6 @@ async fn main() -> Result<(), anyhow::Error> { let mut rpc = ClnRpc::new(p).await?; let response = rpc.call(Request::Getinfo(GetinfoRequest {})).await?; - info!("{}", serde_json::to_string_pretty(&response)?); + println!("{}", serde_json::to_string_pretty(&response)?); Ok(()) } diff --git a/plugins/Cargo.toml b/plugins/Cargo.toml index 49fbc6db0..8acda0435 100644 --- a/plugins/Cargo.toml +++ b/plugins/Cargo.toml @@ -10,11 +10,11 @@ path = "examples/cln-plugin-startup.rs" [dependencies] anyhow = "1.0.51" bytes = "1.1.0" -log = "0.4.14" +log = { version = "0.4.14", features = ['std'] } serde = { version = "1.0.131", features = ["derive"] } serde_json = "1.0.72" tokio-util = { version = "0.6.9", features = ["codec"] } -tokio = { version="1", features = ['io-std', 'rt'] } +tokio = { version="1", features = ['io-std', 'rt', 'sync'] } tokio-stream = "*" futures = "0.3" cln-rpc = { path = "../cln-rpc" } diff --git a/plugins/src/lib.rs b/plugins/src/lib.rs index ca43e567b..4568baccd 100644 --- a/plugins/src/lib.rs +++ b/plugins/src/lib.rs @@ -1,16 +1,19 @@ use crate::codec::{JsonCodec, JsonRpcCodec}; -use futures::sink::SinkExt; -use std::sync::Arc; -use tokio::sync::Mutex; -use tokio_util::codec::FramedWrite; -pub mod codec; -mod messages; pub use anyhow::Error; +use futures::sink::SinkExt; +extern crate log; use log::{trace, warn}; use std::marker::PhantomData; +use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::sync::Mutex; use tokio_stream::StreamExt; use tokio_util::codec::FramedRead; +use tokio_util::codec::FramedWrite; + +pub mod codec; +pub mod logging; +mod messages; #[macro_use] extern crate serde_json; @@ -50,19 +53,19 @@ where } } - pub async fn run(self) -> Result<(), Error> { - let (plugin, input) = self.build(); - plugin.run(input).await - } - pub fn build(self) -> (Plugin, I) { + let output = Arc::new(Mutex::new(FramedWrite::new( + self.output, + JsonCodec::default(), + ))); + + // Now configure the logging, so any `log` call is wrapped + // in a JSON-RPC notification and sent to c-lightning + tokio::spawn(async move {}); ( Plugin { state: Arc::new(Mutex::new(self.state)), - output: Arc::new(Mutex::new(FramedWrite::new( - self.output, - JsonCodec::default(), - ))), + output, input_type: PhantomData, }, self.input, @@ -74,7 +77,7 @@ pub struct Plugin where S: Clone + Send, I: AsyncRead, - O: Send + AsyncWrite, + O: Send + AsyncWrite + 'static, { //input: FramedRead, output: Arc>>, @@ -87,11 +90,14 @@ impl Plugin where S: Clone + Send, I: AsyncRead + Send + Unpin, - O: Send + AsyncWrite + Unpin, + O: Send + AsyncWrite + Unpin + 'static, { /// Read incoming requests from `c-lightning and dispatch their handling. #[allow(unused_mut)] pub async fn run(mut self, input: I) -> Result<(), Error> { + crate::logging::init(self.output.clone()).await?; + trace!("Plugin logging initialized"); + let mut input = FramedRead::new(input, JsonRpcCodec::default()); loop { match input.next().await { diff --git a/plugins/src/logging.rs b/plugins/src/logging.rs new file mode 100644 index 000000000..7a90b84aa --- /dev/null +++ b/plugins/src/logging.rs @@ -0,0 +1,92 @@ +use crate::codec::JsonCodec; +use futures::SinkExt; +use log::{Level, Metadata, Record}; +use serde::Serialize; +use std::sync::Arc; +use tokio::io::AsyncWrite; +use tokio::sync::Mutex; +use tokio_util::codec::FramedWrite; + +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "lowercase")] +struct LogEntry { + level: LogLevel, + message: String, +} + +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "lowercase")] +enum LogLevel { + Debug, + Info, + Warn, + Error, +} + +impl From for LogLevel { + fn from(lvl: log::Level) -> Self { + match lvl { + log::Level::Error => LogLevel::Error, + log::Level::Warn => LogLevel::Warn, + log::Level::Info => LogLevel::Info, + log::Level::Debug | log::Level::Trace => LogLevel::Debug, + } + } +} + +/// A simple logger that just wraps log entries in a JSON-RPC +/// notification and delivers it to `lightningd`. +struct PluginLogger { + // An unbounded mpsc channel we can use to talk to the + // flusher. This avoids having circular locking dependencies if we + // happen to emit a log record while holding the lock on the + // plugin connection. + sender: tokio::sync::mpsc::UnboundedSender, +} + +/// Initialize the logger starting a flusher to the passed in sink. +pub async fn init(out: Arc>>) -> Result<(), log::SetLoggerError> +where + O: AsyncWrite + Send + Unpin + 'static, +{ + let out = out.clone(); + let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::(); + tokio::spawn(async move { + while let Some(i) = receiver.recv().await { + // We continue draining the queue, even if we get some + // errors when forwarding. Forwarding could break due to + // an interrupted connection or stdout being closed, but + // keeping the messages in the queue is a memory leak. + let _ = out + .lock() + .await + .send(json!({ + "jsonrpc": "2.0", + "method": "log", + "params": i + })) + .await; + } + }); + log::set_boxed_logger(Box::new(PluginLogger { sender })) + .map(|()| log::set_max_level(log::LevelFilter::Debug)) +} + +impl log::Log for PluginLogger { + fn enabled(&self, metadata: &Metadata) -> bool { + metadata.level() <= Level::Debug + } + + fn log(&self, record: &Record) { + if self.enabled(record.metadata()) { + self.sender + .send(LogEntry { + level: record.level().into(), + message: record.args().to_string(), + }) + .unwrap(); + } + } + + fn flush(&self) {} +}