Merge pull request #1 from vincenzopalazzo/macros/async_101

include  the final complete example
This commit is contained in:
Vincenzo Palazzo 2023-01-27 18:35:16 +01:00 committed by GitHub
commit 1d07080f32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1571 additions and 24 deletions

54
.github/workflows/build.yml vendored Normal file
View File

@ -0,0 +1,54 @@
name: Sanity Check codebase
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
check:
name: Build
runs-on: ubuntu-latest
strategy:
matrix:
node: [nightly]
steps:
- name: Checkout sources
uses: actions/checkout@v2
- name: Install toolchain
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: ${{ matrix.node }}
override: true
- name: Run cargo check
uses: actions-rs/cargo@v1
with:
command: check
- name: Run cargo test
run: export CSV_PATH=${GITHUB_WORKSPACE}/specs && cargo test
lints:
name: Lints
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/checkout@v2
- name: Install stable toolchain
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
components: rustfmt, clippy
- name: Run cargo fmt
uses: actions-rs/cargo@v1
with:
command: fmt
args: --all -- --check

1284
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -4,4 +4,5 @@ members = [
"custom_rio",
"rust_101",
"tutotrial_cli",
"asyc_101",
]

11
asyc_101/Cargo.toml Normal file
View File

@ -0,0 +1,11 @@
[package]
name = "asyc_101"
version = "0.1.0"
edition = "2021"
[dependencies]
surf = { version = "2.3.2", features = [ "h1-client" ] }
custom_rio = { path = "../custom_rio" }
log = "0.4"
env_logger = "0.9.1"

44
asyc_101/src/lib.rs Normal file
View File

@ -0,0 +1,44 @@
#![allow(dead_code)]
use log::{debug, info};
use surf;
/// Make an http request to Github API and return the result.
async fn ping_github() -> Result<String, surf::Error> {
debug!("Running the https request");
let mut res = surf::get("https://api.github.com/octocat").await?;
let body = res.body_string().await?;
info!("{}", body);
Ok(body)
}
#[cfg(test)]
mod tests {
use custom_rio::runtime::Runtime;
use custom_rio::CustomRio;
use std::sync::Once;
use crate::ping_github;
static INIT: Once = Once::new();
fn init() {
// ignore error
INIT.call_once(|| {
env_logger::init();
});
}
#[test]
fn safety_test_not_running() {
init();
let _ = ping_github();
}
#[test]
fn safety_test_async_running() {
init();
CustomRio::block_on(async {
let _ = ping_github().await.unwrap();
});
}
}

View File

@ -1,14 +1,10 @@
pub fn add(left: usize, right: usize) -> usize {
left + right
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
}
}
//! Custom rio a reproduction in a live of
//! my experimental runtime RIO
//! that lives https://github.com/vincenzopalazzo/rio/tree/main/rt/src
//!
//! author: Vincenzo Palazzo <vincenzopalazzodev@gmail.com>
#![feature(once_cell)]
pub mod runtime;
mod runtime_impl;
pub mod task;
pub use runtime_impl::CustomRio;

View File

@ -0,0 +1,9 @@
//! Here live the runtime implementation of the crate
// code source https://github.com/vincenzopalazzo/rio/pull/15/files#diff-0fa9d453eacf7c8cfca82ff169556a166e1c2fec6bee4132c16e0bac76c094c7
use std::future::Future;
pub trait Runtime {
fn new() -> &'static Self;
fn block_on(future: impl Future<Output = ()> + Send + 'static);
}

View File

@ -0,0 +1,89 @@
use crate::runtime::Runtime;
use crate::task::Task;
use std::collections::LinkedList;
use std::future::Future;
use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, LazyLock, Mutex};
use std::task::Poll;
// how we can improve it?
pub(crate) type Queue = Arc<Mutex<LinkedList<Arc<Task>>>>;
pub(crate) static INSTANCE: LazyLock<CustomRio> = LazyLock::new(|| CustomRio::new());
pub struct CustomRio {
pub(crate) task_queue: Queue,
pub(crate) size_queue: AtomicUsize,
}
impl CustomRio {
fn new() -> Self {
CustomRio::start();
let queue = Arc::new(Mutex::new(LinkedList::new()));
CustomRio {
task_queue: queue.to_owned(),
size_queue: AtomicUsize::new(0),
}
}
/// start the runtime by spowing the event look on a thread!
fn start() {
std::thread::spawn(|| loop {
let task = match CustomRio::get().pop_front() {
Some(task) => task,
None => continue,
};
if let Poll::Ready(_) = task.poll() {};
});
}
pub fn get() -> &'static CustomRio {
INSTANCE.deref()
}
pub(crate) fn pop_front(&self) -> Option<Arc<Task>> {
self.task_queue.lock().unwrap().pop_front()
}
/// This is the function that gets called by the `spawn` function to
/// actually create a new `Task` in our queue. It takes the `Future`,
/// constructs a `Task` and then pushes it to the back of the queue.
pub fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) {
self.inner_spawn(Task::new(false, future));
}
/// This is the function that gets called by the `spawn_blocking` function to
/// actually create a new `Task` in our queue. It takes the `Future`,
/// constructs a `Task` and then pushes it to the front of the queue
/// where the runtime will check if it should block and then block until
/// this future completes.
pub fn spawn_blocking(&self, future: impl Future<Output = ()> + Send + 'static) {
self.inner_spawn_blocking(Task::new(true, future));
}
/// This function just takes a `Task` and pushes it onto the queue. We use this
/// both for spawning new `Task`s and to push old ones that get woken up
/// back onto the queue.
pub(crate) fn inner_spawn(&self, task: Arc<Task>) {
self.task_queue.lock().unwrap().push_back(task);
}
/// This function takes a `Task` and pushes it to the front of the queue
/// if it is meant to block. We use this both for spawning new blocking
/// `Task`s and to push old ones that get woken up back onto the queue.
pub(crate) fn inner_spawn_blocking(&self, task: Arc<Task>) {
self.task_queue.lock().unwrap().push_front(task);
}
}
impl Runtime for CustomRio {
fn new() -> &'static Self {
CustomRio::get()
}
fn block_on(future: impl std::future::Future<Output = ()> + Send + 'static) {
CustomRio::get().spawn_blocking(future);
while CustomRio::get().size_queue.load(Ordering::Relaxed) > 0 {}
}
}

79
custom_rio/src/task.rs Normal file
View File

@ -0,0 +1,79 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Wake, Waker};
use crate::CustomRio;
type PinFuture = Mutex<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>;
/// The `Task` is the basic unit for the executor. It represents a `Future`
/// that may or may not be completed. We spawn `Task`s to be run and poll
/// them until completion in a non-blocking manner unless specifically asked
/// for.
pub(crate) struct Task {
/// This is the actual `Future` we will poll inside of a `Task`. We `Box`
/// and `Pin` the `Future` when we create a task so that we don't need
/// to worry about pinning or more complicated things in the runtime.
future: PinFuture,
/// We need a way to check if the runtime should block on this task and
/// so we use a boolean here to check that!
block: bool,
// The waker is a self reference of the stack but if it is
// not None, this mean that it is already been pool
waker: Option<Arc<Waker>>,
}
impl Wake for Task {
fn wake(self: Arc<Self>) {
if self.is_blocking() {
CustomRio::get().inner_spawn_blocking(self);
} else {
CustomRio::get().inner_spawn(self);
}
}
}
impl Drop for Task {
fn drop(&mut self) {
CustomRio::get().size_queue.fetch_sub(1, Ordering::Relaxed);
}
}
impl Task {
pub(crate) fn new(block: bool, future: impl Future<Output = ()> + Send + 'static) -> Arc<Self> {
CustomRio::get().size_queue.fetch_add(1, Ordering::Relaxed);
Arc::new(Task {
future: Mutex::new(Box::pin(future)),
block,
waker: None,
})
}
/// Pool the following task!
///
/// See more https://cfsamson.github.io/books-futures-explained/3_waker_context.html
pub fn poll(self: &Arc<Self>) -> Poll<()> {
// If the waker exist there is no need to
// poll a new waker, this feature is already in the background
if let None = self.waker {
let waker = self.waker();
let mut ctx = Context::from_waker(&waker);
// FIXME: this is the good place where to remove the element
// from the queue?
self.future.lock().unwrap().as_mut().poll(&mut ctx)
} else {
Poll::Pending
}
}
pub fn waker(self: &Arc<Self>) -> Waker {
self.clone().into()
}
/// The Task is blocking.
pub fn is_blocking(&self) -> bool {
self.block
}
}